mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Compare commits
23 Commits
bodobolero
...
arpad/temp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a84ea625c | ||
|
|
3a098a4eb2 | ||
|
|
ea1a21293d | ||
|
|
94ef73db85 | ||
|
|
9aa0cdee83 | ||
|
|
43ab1935f3 | ||
|
|
1edf84d6c7 | ||
|
|
a64711dec8 | ||
|
|
bb019f2bbb | ||
|
|
ab751abdbd | ||
|
|
5df8284961 | ||
|
|
f4150614d0 | ||
|
|
38dbc5f67f | ||
|
|
3685ad606d | ||
|
|
76a7d37f7e | ||
|
|
cdb6479c8a | ||
|
|
81c557d87e | ||
|
|
e963129678 | ||
|
|
4f0a9fc569 | ||
|
|
81c6a5a796 | ||
|
|
8e05639dbf | ||
|
|
deed46015d | ||
|
|
532d9b646e |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -963,7 +963,7 @@ jobs:
|
||||
fi
|
||||
|
||||
- name: Verify docker-compose example and test extensions
|
||||
timeout-minutes: 20
|
||||
timeout-minutes: 60
|
||||
env:
|
||||
TAG: >-
|
||||
${{
|
||||
|
||||
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -3794,6 +3794,16 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"nix 0.30.1",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "never-say-never"
|
||||
version = "6.6.666"
|
||||
@@ -4424,6 +4434,16 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"prost 0.13.3",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.2.1"
|
||||
@@ -8482,6 +8502,7 @@ dependencies = [
|
||||
"log",
|
||||
"memchr",
|
||||
"nix 0.26.4",
|
||||
"nix 0.30.1",
|
||||
"nom",
|
||||
"num",
|
||||
"num-bigint",
|
||||
|
||||
@@ -9,6 +9,7 @@ members = [
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
@@ -23,6 +24,7 @@ members = [
|
||||
"libs/postgres_ffi",
|
||||
"libs/safekeeper_api",
|
||||
"libs/desim",
|
||||
"libs/neon-shmem",
|
||||
"libs/utils",
|
||||
"libs/consumption_metrics",
|
||||
"libs/postgres_backend",
|
||||
@@ -127,7 +129,7 @@ md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
memoffset = "0.9"
|
||||
nix = { version = "0.30.1", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
|
||||
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
|
||||
# Do not update to >= 7.0.0, at least. The update will have a significant impact
|
||||
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
|
||||
notify = "6.0.0"
|
||||
@@ -251,6 +253,7 @@ pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
|
||||
@@ -144,7 +144,6 @@ RUN set -e \
|
||||
openssh-client \
|
||||
parallel \
|
||||
pkg-config \
|
||||
sudo \
|
||||
unzip \
|
||||
wget \
|
||||
xz-utils \
|
||||
|
||||
@@ -7,7 +7,7 @@ index 255e616..1c6edb7 100644
|
||||
RelationGetRelationName(index));
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_start_unlogged_build(index->rd_smgr);
|
||||
+ smgr_start_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
initRumState(&buildstate.rumstate, index);
|
||||
@@ -18,7 +18,7 @@ index 255e616..1c6edb7 100644
|
||||
rumUpdateStats(index, &buildstate.buildStats, buildstate.rumstate.isBuild);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
/*
|
||||
@@ -29,7 +29,7 @@ index 255e616..1c6edb7 100644
|
||||
}
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(index->rd_smgr);
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
/*
|
||||
|
||||
@@ -462,6 +462,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
if var(REAL_S3_ENV).is_ok() {
|
||||
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
assert!(body.contains("process_threads"));
|
||||
}
|
||||
|
||||
|
||||
13
libs/neon-shmem/Cargo.toml
Normal file
13
libs/neon-shmem/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
thiserror.workspace = true
|
||||
nix.workspace=true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
tempfile = "3.14.0"
|
||||
418
libs/neon-shmem/src/lib.rs
Normal file
418
libs/neon-shmem/src/lib.rs
Normal file
@@ -0,0 +1,418 @@
|
||||
//! Shared memory utilities for neon communicator
|
||||
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::mman::MapFlags;
|
||||
use nix::sys::mman::ProtFlags;
|
||||
use nix::sys::mman::mmap as nix_mmap;
|
||||
use nix::sys::mman::munmap as nix_munmap;
|
||||
use nix::unistd::ftruncate as nix_ftruncate;
|
||||
|
||||
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
|
||||
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
|
||||
/// specified at creation.
|
||||
///
|
||||
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
|
||||
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
|
||||
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
|
||||
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
|
||||
/// future.
|
||||
pub struct ShmemHandle {
|
||||
/// memfd file descriptor
|
||||
fd: OwnedFd,
|
||||
|
||||
max_size: usize,
|
||||
|
||||
// Pointer to the beginning of the shared memory area. The header is stored there.
|
||||
shared_ptr: NonNull<SharedStruct>,
|
||||
|
||||
// Pointer to the beginning of the user data
|
||||
pub data_ptr: NonNull<u8>,
|
||||
}
|
||||
|
||||
/// This is stored at the beginning in the shared memory area.
|
||||
struct SharedStruct {
|
||||
max_size: usize,
|
||||
|
||||
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
|
||||
current_size: AtomicUsize,
|
||||
}
|
||||
|
||||
const RESIZE_IN_PROGRESS: usize = 1 << 63;
|
||||
|
||||
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
|
||||
|
||||
/// Error type returned by the ShmemHandle functions.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("{msg}: {errno}")]
|
||||
pub struct Error {
|
||||
pub msg: String,
|
||||
pub errno: Errno,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
fn new(msg: &str, errno: Errno) -> Error {
|
||||
Error {
|
||||
msg: msg.to_string(),
|
||||
errno,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShmemHandle {
|
||||
/// Create a new shared memory area. To communicate between processes, the processes need to be
|
||||
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
|
||||
///
|
||||
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
|
||||
/// processes can continue using it, however.
|
||||
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
|
||||
// create the backing anonymous file.
|
||||
let fd = create_backing_file(name)?;
|
||||
|
||||
Self::new_with_fd(fd, initial_size, max_size)
|
||||
}
|
||||
|
||||
fn new_with_fd(
|
||||
fd: OwnedFd,
|
||||
initial_size: usize,
|
||||
max_size: usize,
|
||||
) -> Result<ShmemHandle, Error> {
|
||||
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
|
||||
// is a little larger than this because of the SharedStruct header. Make the upper limit
|
||||
// somewhat smaller than that, because with anything close to that, you'll run out of
|
||||
// memory anyway.
|
||||
if max_size >= 1 << 48 {
|
||||
panic!("max size {} too large", max_size);
|
||||
}
|
||||
if initial_size > max_size {
|
||||
panic!("initial size {initial_size} larger than max size {max_size}");
|
||||
}
|
||||
|
||||
// The actual initial / max size is the one given by the caller, plus the size of
|
||||
// 'SharedStruct'.
|
||||
let initial_size = HEADER_SIZE + initial_size;
|
||||
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
|
||||
|
||||
// Reserve address space for it with mmap
|
||||
//
|
||||
// TODO: Use MAP_HUGETLB if possible
|
||||
let start_ptr = unsafe {
|
||||
nix_mmap(
|
||||
None,
|
||||
max_size,
|
||||
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
|
||||
MapFlags::MAP_SHARED,
|
||||
&fd,
|
||||
0,
|
||||
)
|
||||
}
|
||||
.map_err(|e| Error::new("mmap failed: {e}", e))?;
|
||||
|
||||
// Reserve space for the initial size
|
||||
enlarge_file(fd.as_fd(), initial_size as u64)?;
|
||||
|
||||
// Initialize the header
|
||||
let shared: NonNull<SharedStruct> = start_ptr.cast();
|
||||
unsafe {
|
||||
shared.write(SharedStruct {
|
||||
max_size: max_size.into(),
|
||||
current_size: AtomicUsize::new(initial_size),
|
||||
})
|
||||
};
|
||||
|
||||
// The user data begins after the header
|
||||
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
|
||||
|
||||
Ok(ShmemHandle {
|
||||
fd,
|
||||
max_size: max_size.into(),
|
||||
shared_ptr: shared,
|
||||
data_ptr,
|
||||
})
|
||||
}
|
||||
|
||||
// return reference to the header
|
||||
fn shared(&self) -> &SharedStruct {
|
||||
unsafe { self.shared_ptr.as_ref() }
|
||||
}
|
||||
|
||||
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
|
||||
/// when creating the area.
|
||||
///
|
||||
/// This may only be called from one process/thread concurrently. We detect that case
|
||||
/// and return an Error.
|
||||
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
|
||||
let new_size = new_size + HEADER_SIZE;
|
||||
let shared = self.shared();
|
||||
|
||||
if new_size > self.max_size {
|
||||
panic!(
|
||||
"new size ({} is greater than max size ({})",
|
||||
new_size, self.max_size
|
||||
);
|
||||
}
|
||||
assert_eq!(self.max_size, shared.max_size);
|
||||
|
||||
// Lock the area by setting the bit in 'current_size'
|
||||
//
|
||||
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
|
||||
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
|
||||
// since this is not performance-critical, better safe than sorry .
|
||||
let mut old_size = shared.current_size.load(Ordering::Acquire);
|
||||
loop {
|
||||
if (old_size & RESIZE_IN_PROGRESS) != 0 {
|
||||
return Err(Error::new(
|
||||
"concurrent resize detected",
|
||||
Errno::UnknownErrno,
|
||||
));
|
||||
}
|
||||
match shared.current_size.compare_exchange(
|
||||
old_size,
|
||||
new_size,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(x) => old_size = x,
|
||||
}
|
||||
}
|
||||
|
||||
// Ok, we got the lock.
|
||||
//
|
||||
// NB: If anything goes wrong, we *must* clear the bit!
|
||||
let result = {
|
||||
use std::cmp::Ordering::{Equal, Greater, Less};
|
||||
match new_size.cmp(&old_size) {
|
||||
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
|
||||
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
|
||||
}),
|
||||
Equal => Ok(()),
|
||||
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
|
||||
}
|
||||
};
|
||||
|
||||
// Unlock
|
||||
shared.current_size.store(
|
||||
if result.is_ok() { new_size } else { old_size },
|
||||
Ordering::Release,
|
||||
);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the current user-visible size of the shared memory segment.
|
||||
///
|
||||
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
|
||||
/// responsibility not to access the area beyond the current size.
|
||||
pub fn current_size(&self) -> usize {
|
||||
let total_current_size =
|
||||
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
|
||||
total_current_size - HEADER_SIZE
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ShmemHandle {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: The pointer was obtained from mmap() with the given size.
|
||||
// We unmap the entire region.
|
||||
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
|
||||
// The fd is dropped automatically by OwnedFd.
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
|
||||
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
|
||||
/// development and testing, but in production we want the file to stay in memory.
|
||||
///
|
||||
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
|
||||
#[allow(unused_variables)]
|
||||
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
{
|
||||
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
|
||||
.map_err(|e| Error::new("memfd_create failed: {e}", e))
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
let file = tempfile::tempfile().map_err(|e| {
|
||||
Error::new(
|
||||
"could not create temporary file to back shmem area: {e}",
|
||||
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
|
||||
)
|
||||
})?;
|
||||
Ok(OwnedFd::from(file))
|
||||
}
|
||||
}
|
||||
|
||||
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
|
||||
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
|
||||
// we don't get a segfault later when trying to actually use it.
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
{
|
||||
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
|
||||
Error::new(
|
||||
"could not grow shmem segment, posix_fallocate failed: {e}",
|
||||
e,
|
||||
)
|
||||
})
|
||||
}
|
||||
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
nix::unistd::ftruncate(fd, size as i64)
|
||||
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use nix::unistd::ForkResult;
|
||||
use std::ops::Range;
|
||||
|
||||
/// check that all bytes in given range have the expected value.
|
||||
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
|
||||
for i in range {
|
||||
let b = unsafe { *(ptr.add(i)) };
|
||||
assert_eq!(expected, b, "unexpected byte at offset {}", i);
|
||||
}
|
||||
}
|
||||
|
||||
/// Write 'b' to all bytes in the given range
|
||||
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
|
||||
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
|
||||
}
|
||||
|
||||
// simple single-process test of growing and shrinking
|
||||
#[test]
|
||||
fn test_shmem_resize() -> Result<(), Error> {
|
||||
let max_size = 1024 * 1024;
|
||||
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
|
||||
|
||||
assert_eq!(init_struct.current_size(), 0);
|
||||
|
||||
// Initial grow
|
||||
let size1 = 10000;
|
||||
init_struct.set_size(size1).unwrap();
|
||||
assert_eq!(init_struct.current_size(), size1);
|
||||
|
||||
// Write some data
|
||||
let data_ptr = init_struct.data_ptr.as_ptr();
|
||||
write_range(data_ptr, 0xAA, 0..size1);
|
||||
assert_range(data_ptr, 0xAA, 0..size1);
|
||||
|
||||
// Shrink
|
||||
let size2 = 5000;
|
||||
init_struct.set_size(size2).unwrap();
|
||||
assert_eq!(init_struct.current_size(), size2);
|
||||
|
||||
// Grow again
|
||||
let size3 = 20000;
|
||||
init_struct.set_size(size3).unwrap();
|
||||
assert_eq!(init_struct.current_size(), size3);
|
||||
|
||||
// Try to read it. The area that was shrunk and grown again should read as all zeros now
|
||||
assert_range(data_ptr, 0xAA, 0..5000);
|
||||
assert_range(data_ptr, 0, 5000..size1);
|
||||
|
||||
// Try to grow beyond max_size
|
||||
//let size4 = max_size + 1;
|
||||
//assert!(init_struct.set_size(size4).is_err());
|
||||
|
||||
// Dropping init_struct should unmap the memory
|
||||
drop(init_struct);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
|
||||
/// but is stored in the shared memory area and works across processes. It's implemented by
|
||||
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
|
||||
struct SimpleBarrier {
|
||||
num_procs: usize,
|
||||
count: AtomicUsize,
|
||||
}
|
||||
|
||||
impl SimpleBarrier {
|
||||
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
|
||||
unsafe {
|
||||
*ptr = SimpleBarrier {
|
||||
num_procs,
|
||||
count: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(&self) {
|
||||
let old = self.count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let generation = old / self.num_procs;
|
||||
|
||||
let mut current = old + 1;
|
||||
while current < (generation + 1) * self.num_procs {
|
||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||
current = self.count.load(Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multi_process() {
|
||||
// Initialize
|
||||
let max_size = 1_000_000_000_000;
|
||||
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
|
||||
let ptr = init_struct.data_ptr.as_ptr();
|
||||
|
||||
// Store the SimpleBarrier in the first 1k of the area.
|
||||
init_struct.set_size(10000).unwrap();
|
||||
let barrier_ptr: *mut SimpleBarrier = unsafe {
|
||||
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
|
||||
.cast()
|
||||
};
|
||||
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
|
||||
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
|
||||
|
||||
// Fork another test process. The code after this runs in both processes concurrently.
|
||||
let fork_result = unsafe { nix::unistd::fork().unwrap() };
|
||||
|
||||
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
|
||||
if fork_result.is_parent() {
|
||||
write_range(ptr, 0xAA, 1000..2000);
|
||||
} else {
|
||||
write_range(ptr, 0xBB, 2000..3000);
|
||||
}
|
||||
barrier.wait();
|
||||
// Verify the contents. (in both processes)
|
||||
assert_range(ptr, 0xAA, 1000..2000);
|
||||
assert_range(ptr, 0xBB, 2000..3000);
|
||||
|
||||
// Grow, from the child this time
|
||||
let size = 10_000_000;
|
||||
if !fork_result.is_parent() {
|
||||
init_struct.set_size(size).unwrap();
|
||||
}
|
||||
barrier.wait();
|
||||
|
||||
// make some writes at the end
|
||||
if fork_result.is_parent() {
|
||||
write_range(ptr, 0xAA, (size - 10)..size);
|
||||
} else {
|
||||
write_range(ptr, 0xBB, (size - 20)..(size - 10));
|
||||
}
|
||||
barrier.wait();
|
||||
|
||||
// Verify the contents. (This runs in both processes)
|
||||
assert_range(ptr, 0, (size - 1000)..(size - 20));
|
||||
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
|
||||
assert_range(ptr, 0xAA, (size - 10)..size);
|
||||
|
||||
if let ForkResult::Parent { child } = fork_result {
|
||||
nix::sys::wait::waitpid(child, None).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
|
||||
ScatteredLsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
|
||||
@@ -325,6 +325,7 @@ impl TimelineCreateRequest {
|
||||
match &self.mode {
|
||||
TimelineCreateRequestMode::Branch { .. } => "branch",
|
||||
TimelineCreateRequestMode::ImportPgdata { .. } => "import",
|
||||
TimelineCreateRequestMode::Template { .. } => "template",
|
||||
TimelineCreateRequestMode::Bootstrap { .. } => "bootstrap",
|
||||
}
|
||||
}
|
||||
@@ -406,6 +407,10 @@ pub enum TimelineCreateRequestMode {
|
||||
ImportPgdata {
|
||||
import_pgdata: TimelineCreateRequestModeImportPgdata,
|
||||
},
|
||||
Template {
|
||||
template_tenant_id: TenantShardId,
|
||||
template_timeline_id: TimelineId,
|
||||
},
|
||||
// NB: Bootstrap is all-optional, and thus the serde(untagged) will cause serde to stop at Bootstrap.
|
||||
// (serde picks the first matching enum variant, in declaration order).
|
||||
Bootstrap {
|
||||
|
||||
13
pageserver/page_api/Cargo.toml
Normal file
13
pageserver/page_api/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
13
pageserver/page_api/build.rs
Normal file
13
pageserver/page_api/build.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
|
||||
/// descriptor set for Protobuf schema reflection.
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
|
||||
tonic_build::configure()
|
||||
.bytes(["."])
|
||||
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
|
||||
.compile_protos(&["proto/page_service.proto"], &["proto"])
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
233
pageserver/page_api/proto/page_service.proto
Normal file
233
pageserver/page_api/proto/page_service.proto
Normal file
@@ -0,0 +1,233 @@
|
||||
// Page service, presented by pageservers for computes.
|
||||
//
|
||||
// This is the compute read path. It primarily serves page versions at given
|
||||
// LSNs, but also base backups, SLRU segments, and relation metadata.
|
||||
//
|
||||
// EXPERIMENTAL: this is still under development and subject to change.
|
||||
//
|
||||
// Request metadata headers:
|
||||
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
|
||||
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
|
||||
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
|
||||
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
|
||||
//
|
||||
// The service can be accessed via e.g. grpcurl:
|
||||
//
|
||||
// ```
|
||||
// grpcurl \
|
||||
// -plaintext \
|
||||
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
|
||||
// -H "neon-shard-id: 0b10" \
|
||||
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
|
||||
// -H "authorization: Bearer $JWT" \
|
||||
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
|
||||
// localhost:51051 page_api.PageService/CheckRelExists
|
||||
// ```
|
||||
//
|
||||
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
|
||||
// However, this will require reconnecting when changing modes.
|
||||
//
|
||||
// TODO: write implementation guidance on
|
||||
// - Health checks
|
||||
// - Tracing, OpenTelemetry
|
||||
// - Compression
|
||||
|
||||
syntax = "proto3";
|
||||
package page_api;
|
||||
|
||||
service PageService {
|
||||
// Returns whether a relation exists.
|
||||
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
|
||||
// Returns the total size of a database, as # of bytes.
|
||||
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
|
||||
|
||||
// Fetches pages.
|
||||
//
|
||||
// This is implemented as a bidirectional streaming RPC for performance. Unary
|
||||
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
|
||||
// authentication, and so on -- with streaming, we only pay these costs during
|
||||
// the initial stream setup. This ~doubles throughput in benchmarks. Other
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
|
||||
|
||||
// Fetches an SLRU segment.
|
||||
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
|
||||
}
|
||||
|
||||
// The LSN a request should read at.
|
||||
message ReadLsn {
|
||||
// The request's read LSN. Required.
|
||||
uint64 request_lsn = 1;
|
||||
// If given, the caller guarantees that the page has not been modified since
|
||||
// this LSN. Must be smaller than or equal to request_lsn. This allows the
|
||||
// Pageserver to serve an old page without waiting for the request LSN to
|
||||
// arrive. Valid for all request types.
|
||||
//
|
||||
// It is undefined behaviour to make a request such that the page was, in
|
||||
// fact, modified between request_lsn and not_modified_since_lsn. The
|
||||
// Pageserver might detect it and return an error, or it might return the old
|
||||
// page version or the new page version. Setting not_modified_since_lsn equal
|
||||
// to request_lsn is always safe, but can lead to unnecessary waiting.
|
||||
uint64 not_modified_since_lsn = 2;
|
||||
}
|
||||
|
||||
// A relation identifier.
|
||||
message RelTag {
|
||||
uint32 spc_oid = 1;
|
||||
uint32 db_oid = 2;
|
||||
uint32 rel_number = 3;
|
||||
uint32 fork_number = 4;
|
||||
}
|
||||
|
||||
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
|
||||
// other shards will error.
|
||||
message CheckRelExistsRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
message GetBaseBackupResponseChunk {
|
||||
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
|
||||
// gRPC message size limit.
|
||||
bytes chunk = 1;
|
||||
}
|
||||
|
||||
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
message GetDbSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 db_oid = 2;
|
||||
}
|
||||
|
||||
message GetDbSizeResponse {
|
||||
uint64 num_bytes = 1;
|
||||
}
|
||||
|
||||
// Requests one or more pages.
|
||||
message GetPageRequest {
|
||||
// A request ID. Will be included in the response. Should be unique for
|
||||
// in-flight requests on the stream.
|
||||
uint64 request_id = 1;
|
||||
// The request class.
|
||||
GetPageClass request_class = 2;
|
||||
// The LSN to read at.
|
||||
ReadLsn read_lsn = 3;
|
||||
// The relation to read from.
|
||||
RelTag rel = 4;
|
||||
// Page numbers to read. Must belong to the remote shard.
|
||||
//
|
||||
// Multiple pages will be executed as a single batch by the Pageserver,
|
||||
// amortizing layer access costs and parallelizing them. This may increase the
|
||||
// latency of any individual request, but improves the overall latency and
|
||||
// throughput of the batch as a whole.
|
||||
//
|
||||
// TODO: this causes an allocation in the common single-block case. The sender
|
||||
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
|
||||
// into a heap-allocated Vec. Consider optimizing this.
|
||||
//
|
||||
// TODO: we might be able to avoid a sort or something if we mandate that these
|
||||
// are always in order. But we can't currenly rely on this on the server, because
|
||||
// of compatibility with the libpq protocol handler.
|
||||
repeated uint32 block_number = 5;
|
||||
}
|
||||
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
// A prefetch request. NB: can only be classified on pg < 18.
|
||||
GET_PAGE_CLASS_PREFETCH = 2;
|
||||
// A background request (e.g. vacuum).
|
||||
GET_PAGE_CLASS_BACKGROUND = 3;
|
||||
}
|
||||
|
||||
// A GetPage response.
|
||||
//
|
||||
// A batch response will contain all of the requested pages. We could eagerly
|
||||
// emit individual pages as soon as they are ready, but on a readv() Postgres
|
||||
// holds buffer pool locks on all pages in the batch and we'll only return once
|
||||
// the entire batch is ready, so no one can make use of the individual pages.
|
||||
message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatus status = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
// shard 0, other shards will error.
|
||||
message GetRelSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message GetRelSizeResponse {
|
||||
uint32 num_blocks = 1;
|
||||
}
|
||||
|
||||
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
message GetSlruSegmentRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 kind = 2;
|
||||
uint32 segno = 3;
|
||||
}
|
||||
|
||||
// Returns an SLRU segment.
|
||||
//
|
||||
// These are up 32 pages (256 KB), so we can send them as a single response.
|
||||
message GetSlruSegmentResponse {
|
||||
bytes segment = 1;
|
||||
}
|
||||
19
pageserver/page_api/src/lib.rs
Normal file
19
pageserver/page_api/src/lib.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
//! This crate provides the Pageserver's page API. It contains:
|
||||
//!
|
||||
//! * proto/page_service.proto: the Protobuf schema for the page API.
|
||||
//! * proto: auto-generated Protobuf types for gRPC.
|
||||
//!
|
||||
//! This crate is used by both the client and the server. Try to keep it slim.
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
tonic::include_proto!("page_api");
|
||||
|
||||
/// File descriptor set for Protobuf schema reflection. This allows using
|
||||
/// e.g. grpcurl with the API.
|
||||
pub const FILE_DESCRIPTOR_SET: &[u8] =
|
||||
tonic::include_file_descriptor_set!("page_api_descriptor");
|
||||
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
@@ -144,7 +144,7 @@ where
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency: IoConcurrency::spawn_from_conf(
|
||||
timeline.conf,
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
|
||||
@@ -629,6 +629,12 @@ paths:
|
||||
existing_initdb_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
template_tenant_id:
|
||||
type: string
|
||||
format: hex
|
||||
template_timeline_id:
|
||||
type: string
|
||||
format: hex
|
||||
import_pgdata:
|
||||
$ref: "#/components/schemas/TimelineCreateRequestImportPgdata"
|
||||
responses:
|
||||
|
||||
@@ -607,6 +607,14 @@ async fn timeline_create_handler(
|
||||
}
|
||||
},
|
||||
}),
|
||||
TimelineCreateRequestMode::Template {
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
} => tenant::CreateTimelineParams::Template(tenant::CreateTimelineParamsTemplate {
|
||||
new_timeline_id,
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
}),
|
||||
};
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Error);
|
||||
@@ -3199,7 +3207,7 @@ async fn list_aux_files(
|
||||
.await?;
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
state.conf,
|
||||
state.conf.get_vectored_concurrent_io,
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
@@ -389,6 +388,7 @@ struct PageServerHandler {
|
||||
timeline_handles: Option<TimelineHandles>,
|
||||
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
conf,
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
@@ -862,6 +861,7 @@ impl PageServerHandler {
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -1278,7 +1278,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn pagesteam_handle_batched_message<IO>(
|
||||
async fn pagestream_handle_batched_message<IO>(
|
||||
&mut self,
|
||||
pgb_writer: &mut PostgresBackend<IO>,
|
||||
batch: BatchedFeMessage,
|
||||
@@ -1623,7 +1623,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.get_vectored_concurrent_io,
|
||||
match self.gate_guard.try_clone() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
@@ -1733,7 +1733,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
let result = self
|
||||
.pagesteam_handle_batched_message(
|
||||
.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
msg,
|
||||
io_concurrency.clone(),
|
||||
@@ -1909,7 +1909,7 @@ impl PageServerHandler {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(
|
||||
self.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
|
||||
@@ -586,7 +586,7 @@ impl Timeline {
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -645,7 +645,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -885,7 +885,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
|
||||
@@ -864,6 +864,7 @@ impl Debug for SetStoppingError {
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum CreateTimelineParams {
|
||||
Bootstrap(CreateTimelineParamsBootstrap),
|
||||
Template(CreateTimelineParamsTemplate),
|
||||
Branch(CreateTimelineParamsBranch),
|
||||
ImportPgdata(CreateTimelineParamsImportPgdata),
|
||||
}
|
||||
@@ -875,6 +876,13 @@ pub(crate) struct CreateTimelineParamsBootstrap {
|
||||
pub(crate) pg_version: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsTemplate {
|
||||
pub(crate) new_timeline_id: TimelineId,
|
||||
pub(crate) template_tenant_id: TenantShardId,
|
||||
pub(crate) template_timeline_id: TimelineId,
|
||||
}
|
||||
|
||||
/// NB: See comment on [`CreateTimelineIdempotency::Branch`] for why there's no `pg_version` here.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct CreateTimelineParamsBranch {
|
||||
@@ -921,6 +929,7 @@ pub(crate) enum CreateTimelineIdempotency {
|
||||
ancestor_start_lsn: Lsn,
|
||||
},
|
||||
ImportPgdata(CreatingTimelineIdempotencyImportPgdata),
|
||||
Template(TenantShardId, TimelineId),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
@@ -2665,6 +2674,9 @@ impl TenantShard {
|
||||
CreateTimelineParams::ImportPgdata(params) => {
|
||||
self.create_timeline_import_pgdata(params, ctx).await?
|
||||
}
|
||||
CreateTimelineParams::Template(params) => {
|
||||
self.create_timeline_from_template(params, ctx).await?
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we have dropped our guard on [`Self::timelines_creating`], and
|
||||
@@ -2729,6 +2741,109 @@ impl TenantShard {
|
||||
Ok(activated_timeline)
|
||||
}
|
||||
|
||||
async fn create_timeline_from_template(
|
||||
self: &Arc<Self>,
|
||||
params: CreateTimelineParamsTemplate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CreateTimelineResult, CreateTimelineError> {
|
||||
let CreateTimelineParamsTemplate {
|
||||
new_timeline_id,
|
||||
template_tenant_id,
|
||||
template_timeline_id,
|
||||
} = params;
|
||||
|
||||
// 0. create a guard to prevent parallel creation attempts.
|
||||
let _timeline_create_guard = match self
|
||||
.start_creating_timeline(
|
||||
new_timeline_id,
|
||||
CreateTimelineIdempotency::Template(template_tenant_id, template_timeline_id),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
StartCreatingTimelineResult::CreateGuard(guard) => guard,
|
||||
StartCreatingTimelineResult::Idempotent(timeline) => {
|
||||
return Ok(CreateTimelineResult::Idempotent(timeline));
|
||||
}
|
||||
};
|
||||
|
||||
// 1. download the index part of the template timeline
|
||||
// TODO can we hardcode the generation here or should we pass it as parameter?
|
||||
let template_generation = Generation::new(1);
|
||||
let (template_index_part, template_generation, _) =
|
||||
remote_timeline_client::download_template_index_part(
|
||||
&self.remote_storage,
|
||||
&template_tenant_id,
|
||||
&template_timeline_id,
|
||||
template_generation,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| CreateTimelineError::Other(e.into()))?;
|
||||
|
||||
tracing::info!(
|
||||
"downloaded template index_part.json with generation {}",
|
||||
template_generation.get_suffix()
|
||||
);
|
||||
|
||||
// 2. create the timeline, initializing the index part of the new timeline with the layers from the template
|
||||
let template_metadata = &template_index_part.metadata;
|
||||
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
template_metadata.disk_consistent_lsn(),
|
||||
None,
|
||||
None,
|
||||
Lsn(0),
|
||||
template_metadata.latest_gc_cutoff_lsn(),
|
||||
template_metadata.initdb_lsn(),
|
||||
template_metadata.pg_version(),
|
||||
);
|
||||
let mut index_part = IndexPart::empty(new_metadata.clone());
|
||||
index_part.layer_metadata = template_index_part
|
||||
.layer_metadata
|
||||
.iter()
|
||||
.map(|(layer_name, metadata)| {
|
||||
// TODO support local sharing of layers
|
||||
let mut metadata = metadata.clone();
|
||||
metadata.template_ttid = Some((template_tenant_id, template_timeline_id));
|
||||
(layer_name.clone(), metadata)
|
||||
})
|
||||
.collect();
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
let _res = self
|
||||
.load_remote_timeline(
|
||||
new_timeline_id,
|
||||
index_part,
|
||||
new_metadata.clone(),
|
||||
None,
|
||||
resources,
|
||||
LoadTimelineCause::Attach,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let timeline = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let Some(timeline) = timelines.get(&new_timeline_id) else {
|
||||
warn!("timeline not available directly after attach");
|
||||
panic!();
|
||||
};
|
||||
let offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
self.initialize_gc_info(&timelines, &offloaded_timelines, Some(new_timeline_id));
|
||||
|
||||
Arc::clone(timeline)
|
||||
};
|
||||
|
||||
// Callers are responsible to wait for uploads to complete and for activating the timeline.
|
||||
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_full_metadata_update(&new_metadata)
|
||||
.context("imported timeline initial metadata upload")?;
|
||||
|
||||
// 4. finish
|
||||
Ok(CreateTimelineResult::Created(timeline))
|
||||
}
|
||||
|
||||
/// The returned [`Arc<Timeline>`] is NOT in the [`TenantShard::timelines`] map until the import
|
||||
/// completes in the background. A DIFFERENT [`Arc<Timeline>`] will be inserted into the
|
||||
/// [`TenantShard::timelines`] map when the import completes.
|
||||
@@ -5683,7 +5798,6 @@ pub(crate) mod harness {
|
||||
use pageserver_api::models::ShardParameters;
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use utils::id::TenantId;
|
||||
use utils::logging;
|
||||
|
||||
use super::*;
|
||||
@@ -8596,8 +8710,10 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, GetVectoredError> {
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
tline.conf.get_vectored_concurrent_io,
|
||||
tline.gate.enter().unwrap(),
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
|
||||
let mut res = tline
|
||||
|
||||
@@ -189,8 +189,9 @@ use anyhow::Context;
|
||||
use camino::Utf8Path;
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
pub(crate) use download::{
|
||||
download_index_part, download_initdb_tar_zst, download_tenant_manifest, is_temp_download_file,
|
||||
list_remote_tenant_shards, list_remote_timelines,
|
||||
download_index_part, download_initdb_tar_zst, download_template_index_part,
|
||||
download_tenant_manifest, is_temp_download_file, list_remote_tenant_shards,
|
||||
list_remote_timelines,
|
||||
};
|
||||
use index::GcCompactionState;
|
||||
pub(crate) use index::LayerFileMetadata;
|
||||
@@ -1768,7 +1769,7 @@ impl RemoteTimelineClient {
|
||||
adopted_as: &Layer,
|
||||
cancel: &CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let source_remote_path = remote_layer_path(
|
||||
let source_remote_path = remote_layer_path_maybe_template(
|
||||
&self.tenant_shard_id.tenant_id,
|
||||
&adopted
|
||||
.get_timeline_id()
|
||||
@@ -1776,6 +1777,7 @@ impl RemoteTimelineClient {
|
||||
adopted.metadata().shard,
|
||||
&adopted.layer_desc().layer_name(),
|
||||
adopted.metadata().generation,
|
||||
adopted.metadata().template_ttid,
|
||||
);
|
||||
|
||||
let target_remote_path = remote_layer_path(
|
||||
@@ -2684,6 +2686,31 @@ pub fn remote_layer_path(
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_layer_path_maybe_template(
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
shard: ShardIndex,
|
||||
layer_file_name: &LayerName,
|
||||
generation: Generation,
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
) -> RemotePath {
|
||||
if let Some((template_tenant_shard_id, template_timeline_id)) = template_ttid {
|
||||
let template_tenant_id = template_tenant_shard_id.tenant_id;
|
||||
let template_shard_id = template_tenant_shard_id.to_index();
|
||||
// Generation-aware key format
|
||||
let path = format!(
|
||||
"templates/{template_tenant_id}{0}/{TIMELINES_SEGMENT_NAME}/{template_timeline_id}/{1}{2}",
|
||||
template_shard_id.get_suffix(),
|
||||
layer_file_name,
|
||||
generation.get_suffix()
|
||||
);
|
||||
|
||||
RemotePath::from_string(&path).expect("Failed to construct path")
|
||||
} else {
|
||||
remote_layer_path(tenant_id, timeline_id, shard, layer_file_name, generation)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if a and b have the same layer path within a tenant/timeline. This is essentially
|
||||
/// remote_layer_path(a) == remote_layer_path(b) without the string allocations.
|
||||
///
|
||||
@@ -2729,6 +2756,19 @@ pub fn remote_index_path(
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub fn remote_template_index_path(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
generation: Generation,
|
||||
) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"templates/{tenant_shard_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}",
|
||||
IndexPart::FILE_NAME,
|
||||
generation.get_suffix()
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
|
||||
|
||||
@@ -29,7 +29,7 @@ use super::manifest::TenantManifest;
|
||||
use super::{
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, INITDB_PATH, parse_remote_index_path,
|
||||
parse_remote_tenant_manifest_path, remote_index_path, remote_initdb_archive_path,
|
||||
remote_initdb_preserved_archive_path, remote_tenant_manifest_path,
|
||||
remote_initdb_preserved_archive_path, remote_template_index_path, remote_tenant_manifest_path,
|
||||
remote_tenant_manifest_prefix, remote_tenant_path,
|
||||
};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
@@ -39,7 +39,9 @@ use crate::span::{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id, debug_assert_current_span_has_tenant_id,
|
||||
};
|
||||
use crate::tenant::Generation;
|
||||
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
|
||||
use crate::tenant::remote_timeline_client::{
|
||||
remote_layer_path_maybe_template, remote_timelines_path,
|
||||
};
|
||||
use crate::tenant::storage_layer::LayerName;
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
@@ -68,12 +70,13 @@ pub async fn download_layer_file<'a>(
|
||||
|
||||
let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
|
||||
let remote_path = remote_layer_path(
|
||||
let remote_path = remote_layer_path_maybe_template(
|
||||
&tenant_shard_id.tenant_id,
|
||||
&timeline_id,
|
||||
layer_metadata.shard,
|
||||
layer_file_name,
|
||||
layer_metadata.generation,
|
||||
layer_metadata.template_ttid,
|
||||
);
|
||||
|
||||
let (bytes_amount, temp_file) = download_retry(
|
||||
@@ -393,6 +396,32 @@ async fn do_download_index_part(
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
async fn do_download_template_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: Option<&TimelineId>,
|
||||
index_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
let timeline_id =
|
||||
timeline_id.expect("A timeline ID is always provided when downloading an index");
|
||||
let remote_path = remote_template_index_path(tenant_shard_id, timeline_id, index_generation);
|
||||
|
||||
let download_opts = DownloadOpts {
|
||||
kind: DownloadKind::Small,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (index_part_bytes, index_part_mtime) =
|
||||
do_download_remote_path_retry_forever(storage, &remote_path, download_opts, cancel).await?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| format!("deserialize index part file at {remote_path:?}"))
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
Ok((index_part, index_generation, index_part_mtime))
|
||||
}
|
||||
|
||||
/// Metadata objects are "generationed", meaning that they include a generation suffix. This
|
||||
/// function downloads the object with the highest generation <= `my_generation`.
|
||||
///
|
||||
@@ -559,6 +588,35 @@ pub(crate) async fn download_index_part(
|
||||
.await
|
||||
}
|
||||
|
||||
/// index_part.json objects are suffixed with a generation number, so we cannot
|
||||
/// directly GET the latest index part without doing some probing.
|
||||
///
|
||||
/// In this function we probe for the most recent index in a generation <= our current generation.
|
||||
/// See "Finding the remote indices for timelines" in docs/rfcs/025-generation-numbers.md
|
||||
pub(crate) async fn download_template_index_part(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
my_generation: Generation,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<(IndexPart, Generation, SystemTime), DownloadError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let index_prefix = remote_template_index_path(tenant_shard_id, timeline_id, Generation::none());
|
||||
download_generation_object(
|
||||
storage,
|
||||
tenant_shard_id,
|
||||
Some(timeline_id),
|
||||
my_generation,
|
||||
"index_part",
|
||||
index_prefix,
|
||||
do_download_template_index_part,
|
||||
parse_remote_index_path,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn download_tenant_manifest(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
|
||||
@@ -12,6 +12,7 @@ use pageserver_api::shard::ShardIndex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
use super::is_same_remote_layer_path;
|
||||
use crate::tenant::Generation;
|
||||
@@ -233,6 +234,10 @@ pub struct LayerFileMetadata {
|
||||
#[serde(default = "ShardIndex::unsharded")]
|
||||
#[serde(skip_serializing_if = "ShardIndex::is_unsharded")]
|
||||
pub shard: ShardIndex,
|
||||
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
}
|
||||
|
||||
impl LayerFileMetadata {
|
||||
@@ -241,6 +246,7 @@ impl LayerFileMetadata {
|
||||
file_size,
|
||||
generation,
|
||||
shard,
|
||||
template_ttid: None,
|
||||
}
|
||||
}
|
||||
/// Helper to get both generation and file size in a tuple
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
@@ -318,11 +318,10 @@ impl IoConcurrency {
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_from_conf(
|
||||
conf: &'static PageServerConf,
|
||||
conf: GetVectoredConcurrentIo,
|
||||
gate_guard: GateGuard,
|
||||
) -> IoConcurrency {
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
let selected = match conf.get_vectored_concurrent_io {
|
||||
let selected = match conf {
|
||||
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
|
||||
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
|
||||
};
|
||||
|
||||
@@ -185,6 +185,7 @@ impl Layer {
|
||||
None,
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
metadata.template_ttid,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
@@ -225,6 +226,7 @@ impl Layer {
|
||||
Some(inner),
|
||||
metadata.generation,
|
||||
metadata.shard,
|
||||
metadata.template_ttid,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -273,6 +275,7 @@ impl Layer {
|
||||
Some(inner),
|
||||
timeline.generation,
|
||||
timeline.get_shard_index(),
|
||||
None,
|
||||
)
|
||||
}));
|
||||
|
||||
@@ -710,6 +713,8 @@ struct LayerInner {
|
||||
/// a shard split since the layer was originally written.
|
||||
shard: ShardIndex,
|
||||
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
|
||||
/// When the Layer was last evicted but has not been downloaded since.
|
||||
///
|
||||
/// This is used for skipping evicted layers from the previous heatmap (see
|
||||
@@ -853,6 +858,7 @@ impl LayerInner {
|
||||
downloaded: Option<Arc<DownloadedLayer>>,
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
template_ttid: Option<(TenantShardId, TimelineId)>,
|
||||
) -> Self {
|
||||
let (inner, version, init_status) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
@@ -888,6 +894,7 @@ impl LayerInner {
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
shard,
|
||||
template_ttid,
|
||||
last_evicted_at: std::sync::Mutex::default(),
|
||||
#[cfg(test)]
|
||||
failpoints: Default::default(),
|
||||
@@ -1623,7 +1630,9 @@ impl LayerInner {
|
||||
}
|
||||
|
||||
fn metadata(&self) -> LayerFileMetadata {
|
||||
LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard)
|
||||
let mut metadata = LayerFileMetadata::new(self.desc.file_size, self.generation, self.shard);
|
||||
metadata.template_ttid = self.template_ttid;
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Needed to use entered runtime in tests, but otherwise use BACKGROUND_RUNTIME.
|
||||
@@ -1771,13 +1780,19 @@ impl DownloadedLayer {
|
||||
"these are the same, just avoiding the upgrade"
|
||||
);
|
||||
|
||||
let (ex_tenant_id, ex_timeline_id) =
|
||||
if let Some((tenant_id, timeline_id)) = owner.template_ttid {
|
||||
(tenant_id.tenant_id, timeline_id)
|
||||
} else {
|
||||
(owner.desc.tenant_shard_id.tenant_id, owner.desc.timeline_id)
|
||||
};
|
||||
let res = if owner.desc.is_delta {
|
||||
let ctx = RequestContextBuilder::from(ctx)
|
||||
.page_content_kind(crate::context::PageContentKind::DeltaLayerSummary)
|
||||
.attached_child();
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
ex_tenant_id,
|
||||
ex_timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
@@ -1795,8 +1810,8 @@ impl DownloadedLayer {
|
||||
.attached_child();
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
ex_tenant_id,
|
||||
ex_timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
|
||||
@@ -3530,7 +3530,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self_ref.conf,
|
||||
self_ref.conf.get_vectored_concurrent_io,
|
||||
self_ref
|
||||
.gate
|
||||
.enter()
|
||||
@@ -5559,7 +5559,7 @@ impl Timeline {
|
||||
});
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CreateImageLayersError::Cancelled)?,
|
||||
|
||||
@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.conf.get_vectored_concurrent_io,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
|
||||
@@ -936,6 +936,44 @@ lfc_prewarm_main(Datum main_arg)
|
||||
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
|
||||
}
|
||||
|
||||
void
|
||||
lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
|
||||
{
|
||||
BufferTag tag;
|
||||
FileCacheEntry *entry;
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
tag.forkNum = forkNum;
|
||||
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
for (BlockNumber blkno = 0; blkno < nblocks; blkno += lfc_blocks_per_chunk)
|
||||
{
|
||||
tag.blockNum = blkno;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
if (entry != NULL)
|
||||
{
|
||||
for (int i = 0; i < lfc_blocks_per_chunk; i++)
|
||||
{
|
||||
if (GET_STATE(entry, i) == AVAILABLE)
|
||||
{
|
||||
lfc_ctl->used_pages -= 1;
|
||||
SET_STATE(entry, i, UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if page is present in the cache.
|
||||
|
||||
@@ -28,6 +28,7 @@ typedef struct FileCacheState
|
||||
extern bool lfc_store_prefetch_result;
|
||||
|
||||
/* functions for local file cache */
|
||||
extern void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks);
|
||||
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, const void *const *buffers,
|
||||
BlockNumber nblocks);
|
||||
|
||||
@@ -86,7 +86,7 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
|
||||
#define InvalidRelFileNumber InvalidOid
|
||||
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
(reln->smgr_rnode.node)
|
||||
|
||||
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
|
||||
@@ -148,6 +148,12 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
|
||||
#endif
|
||||
|
||||
#define NRelFileInfoInvalidate(rinfo) do { \
|
||||
NInfoGetSpcOid(rinfo) = InvalidOid; \
|
||||
NInfoGetDbOid(rinfo) = InvalidOid; \
|
||||
NInfoGetRelNumber(rinfo) = InvalidRelFileNumber; \
|
||||
} while (0)
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define ProcNumber BackendId
|
||||
#define INVALID_PROC_NUMBER InvalidBackendId
|
||||
|
||||
@@ -108,7 +108,7 @@ typedef enum
|
||||
UNLOGGED_BUILD_NOT_PERMANENT
|
||||
} UnloggedBuildPhase;
|
||||
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static NRelFileInfo unlogged_build_rel_info;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
@@ -912,16 +912,19 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdextend(reln, forkNum, blkno, buffer, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdextend(reln, forkNum, blkno, buffer, skipFsync);
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
lfc_write(InfoFromSMgrRel(reln), forkNum, blkno, buffer);
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -1003,21 +1006,19 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
{
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -1387,8 +1388,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdread(reln, forkNum, blkno, buffer);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1474,8 +1481,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdreadv(reln, forknum, blocknum, buffers, nblocks);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1608,6 +1621,15 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
|
||||
#else
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1617,9 +1639,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
#else
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
#endif
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
|
||||
return;
|
||||
default:
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
@@ -1680,14 +1699,16 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
|
||||
return;
|
||||
default:
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
@@ -1723,6 +1744,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
return mdnblocks(reln, forknum);
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1792,6 +1817,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1930,7 +1960,6 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
*/
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
neon_log(ERROR, "unlogged relation build is already in progress");
|
||||
Assert(unlogged_build_rel == NULL);
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
|
||||
@@ -1947,7 +1976,7 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_rel_info = InfoFromSMgrRel(reln);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (!IsParallelWorker())
|
||||
@@ -1968,12 +1997,9 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
#endif
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_rel_info = InfoFromSMgrRel(reln);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
|
||||
/* Make the relation look like it's unlogged */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
|
||||
|
||||
/*
|
||||
* Create the local file. In a parallel build, the leader is expected to
|
||||
* call this first and do it.
|
||||
@@ -2000,17 +2026,16 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
static void
|
||||
neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
{
|
||||
Assert(unlogged_build_rel == reln);
|
||||
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
|
||||
RelFileInfoFmt((unlogged_build_rel_info)))));
|
||||
|
||||
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
return;
|
||||
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
/*
|
||||
* In a parallel build, (only) the leader process performs the 2nd
|
||||
@@ -2018,7 +2043,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
*/
|
||||
if (IsParallelWorker())
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
else
|
||||
@@ -2039,11 +2064,11 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
{
|
||||
NRelFileInfoBackend rinfob = InfoBFromSMgrRel(reln);
|
||||
|
||||
Assert(unlogged_build_rel == reln);
|
||||
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
|
||||
RelFileInfoFmt(unlogged_build_rel_info))));
|
||||
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
{
|
||||
@@ -2051,7 +2076,6 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
BlockNumber nblocks;
|
||||
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
/*
|
||||
* Update the last-written LSN cache.
|
||||
@@ -2072,9 +2096,6 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM);
|
||||
|
||||
/* Make the relation look permanent again */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
|
||||
|
||||
/* Remove local copy */
|
||||
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
|
||||
{
|
||||
@@ -2083,6 +2104,8 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
forknum);
|
||||
|
||||
forget_cached_relsize(InfoFromNInfoB(rinfob), forknum);
|
||||
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
|
||||
|
||||
mdclose(reln, forknum);
|
||||
#ifndef DEBUG_COMPARE_LOCAL
|
||||
/* use isRedo == true, so that we drop it immediately */
|
||||
@@ -2093,7 +2116,7 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
mdunlink(rinfob, INIT_FORKNUM, true);
|
||||
#endif
|
||||
}
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
|
||||
@@ -2166,7 +2189,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
* Forget about any build we might have had in progress. The local
|
||||
* file will be unlinked by smgrDoPendingDeletes()
|
||||
*/
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
break;
|
||||
|
||||
@@ -2178,7 +2201,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
case XACT_EVENT_PRE_PREPARE:
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
|
||||
15
poetry.lock
generated
15
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1145,18 +1145,19 @@ dotenv = ["python-dotenv"]
|
||||
|
||||
[[package]]
|
||||
name = "flask-cors"
|
||||
version = "5.0.0"
|
||||
description = "A Flask extension adding a decorator for CORS support"
|
||||
version = "6.0.0"
|
||||
description = "A Flask extension simplifying CORS support"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
python-versions = "<4.0,>=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
|
||||
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
|
||||
{file = "flask_cors-6.0.0-py3-none-any.whl", hash = "sha256:6332073356452343a8ccddbfec7befdc3fdd040141fe776ec9b94c262f058657"},
|
||||
{file = "flask_cors-6.0.0.tar.gz", hash = "sha256:4592c1570246bf7beee96b74bc0adbbfcb1b0318f6ba05c412e8909eceec3393"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
Flask = ">=0.9"
|
||||
flask = ">=0.9"
|
||||
Werkzeug = ">=0.7"
|
||||
|
||||
[[package]]
|
||||
name = "frozenlist"
|
||||
|
||||
@@ -161,8 +161,11 @@ struct ProxyCliArgs {
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Cancellation channel size (max queue size for redis kv client)
|
||||
#[clap(long, default_value = "1024")]
|
||||
#[clap(long, default_value_t = 1024)]
|
||||
cancellation_ch_size: usize,
|
||||
/// Cancellation ops batch size for redis
|
||||
#[clap(long, default_value_t = 8)]
|
||||
cancellation_batch_size: usize,
|
||||
/// cache for `allowed_ips` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
allowed_ips_cache: String,
|
||||
@@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
if let Some(mut redis_kv_client) = redis_kv_client {
|
||||
maintenance_tasks.spawn(async move {
|
||||
redis_kv_client.try_connect().await?;
|
||||
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
|
||||
handle_cancel_messages(
|
||||
&mut redis_kv_client,
|
||||
rx_cancel,
|
||||
args.cancellation_batch_size,
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(redis_kv_client);
|
||||
|
||||
|
||||
@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
type IpSubnetKey = IpNet;
|
||||
|
||||
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
|
||||
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
|
||||
const BATCH_SIZE: usize = 8;
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelKeyOp {
|
||||
@@ -231,12 +229,13 @@ impl CancelReplyOp {
|
||||
pub async fn handle_cancel_messages(
|
||||
client: &mut RedisKVClient,
|
||||
mut rx: mpsc::Receiver<CancelKeyOp>,
|
||||
batch_size: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
|
||||
let mut batch = Vec::with_capacity(batch_size);
|
||||
let mut pipeline = Pipeline::with_capacity(batch_size);
|
||||
|
||||
loop {
|
||||
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
|
||||
if rx.recv_many(&mut batch, batch_size).await == 0 {
|
||||
warn!("shutting down cancellation queue");
|
||||
break Ok(());
|
||||
}
|
||||
@@ -367,8 +366,7 @@ impl CancellationHandler {
|
||||
return Err(CancelError::InternalError);
|
||||
};
|
||||
|
||||
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
|
||||
.await
|
||||
tx.try_send(op)
|
||||
.map_err(|e| {
|
||||
tracing::warn!("failed to send GetCancelData for {key}: {e}");
|
||||
})
|
||||
@@ -570,7 +568,7 @@ impl Session {
|
||||
}
|
||||
|
||||
// Send the store key op to the cancellation handler and set TTL for the key
|
||||
pub(crate) async fn write_cancel_key(
|
||||
pub(crate) fn write_cancel_key(
|
||||
&self,
|
||||
cancel_closure: CancelClosure,
|
||||
) -> Result<(), CancelError> {
|
||||
@@ -596,14 +594,14 @@ impl Session {
|
||||
expire: CANCEL_KEY_TTL,
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
let Some(tx) = &self.cancellation_handler.tx else {
|
||||
tracing::warn!("cancellation handler is not available");
|
||||
return Err(CancelError::InternalError);
|
||||
@@ -619,7 +617,7 @@ impl Session {
|
||||
.guard(RedisMsgKind::HDel),
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
|
||||
});
|
||||
|
||||
@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
|
||||
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
|
||||
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ class AbstractNeonCli:
|
||||
else:
|
||||
stdout = ""
|
||||
|
||||
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
||||
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
||||
raise
|
||||
|
||||
indent = " "
|
||||
|
||||
@@ -510,7 +510,7 @@ def list_elegible_layers(
|
||||
except KeyError:
|
||||
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
|
||||
# matches what's on disk.
|
||||
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
raise
|
||||
|
||||
return list(c for c in candidates if is_visible(c))
|
||||
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
except:
|
||||
# On assertion failures, log some details to help with debugging
|
||||
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
|
||||
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
|
||||
raise
|
||||
|
||||
# Scrub the remote storage
|
||||
|
||||
190
test_runner/regress/test_templates.py
Normal file
190
test_runner/regress/test_templates.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import base64
|
||||
import json
|
||||
import shutil
|
||||
import time
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from threading import Event
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import LocalFsStorage, MockS3Server, RemoteStorageKind
|
||||
from fixtures.utils import (
|
||||
run_only_on_default_postgres,
|
||||
shared_buffers_for_max_cu,
|
||||
skip_in_debug_build,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.workload import Workload
|
||||
from mypy_boto3_kms import KMSClient
|
||||
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
|
||||
from mypy_boto3_s3 import S3Client
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
num_rows = 1000
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
def test_template_smoke(neon_env_builder: NeonEnvBuilder):
|
||||
shard_count = 1
|
||||
stripe_size = 1024
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
|
||||
|
||||
log.info("create template data")
|
||||
|
||||
template_tenant_id = TenantId.generate()
|
||||
template_timeline_id = TimelineId.generate()
|
||||
template_tenant_shard_id = TenantShardId(template_tenant_id, 0, 1)
|
||||
|
||||
env.create_tenant(shard_count=1, tenant_id=template_tenant_id, timeline_id=template_timeline_id)
|
||||
|
||||
def validate_data_equivalence(ep):
|
||||
# TODO: would be nicer to just compare pgdump
|
||||
|
||||
# Enable IO concurrency for batching on large sequential scan, to avoid making
|
||||
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
|
||||
# pretty onerous though, so increase statement_timeout to avoid timeouts.
|
||||
assert ep.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(expect_nrows, expect_sum)]]
|
||||
|
||||
# Fill the template with some data
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant_id) as endpoint:
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
endpoint.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
|
||||
target_relblock_size = 1024 * 8192
|
||||
|
||||
while True:
|
||||
relblock_size = endpoint.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
endpoint.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
(nrows) * (nrows + 1) // 2
|
||||
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
|
||||
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
template_tenant_shard_id, template_timeline_id
|
||||
)
|
||||
wait_for_last_flush_lsn(env, endpoint, template_tenant_id, template_timeline_id)
|
||||
|
||||
validate_data_equivalence(endpoint)
|
||||
|
||||
# Copy the template to the templates dir and delete the original project
|
||||
from_dir = env.pageserver_remote_storage.tenant_path(template_tenant_shard_id)
|
||||
to_dir = env.pageserver_remote_storage.root / "templates" / str(template_tenant_id)
|
||||
shutil.copytree(from_dir, to_dir)
|
||||
|
||||
# Do the template creation
|
||||
new_tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(new_tenant_id)
|
||||
|
||||
new_timeline_id = TimelineId.generate()
|
||||
log.info("starting timeline creation")
|
||||
start = time.monotonic()
|
||||
|
||||
branch_name = "on_template"
|
||||
env.storage_controller.timeline_create(
|
||||
new_tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(new_timeline_id),
|
||||
"template_tenant_id": str(template_tenant_id),
|
||||
"template_timeline_id": str(template_timeline_id),
|
||||
},
|
||||
)
|
||||
env.neon_cli.mappings_map_branch(branch_name, new_tenant_id, new_timeline_id)
|
||||
|
||||
# Get some timeline details for later.
|
||||
locations = env.storage_controller.locate(new_tenant_id)
|
||||
[shard_zero] = [
|
||||
loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0
|
||||
]
|
||||
shard_zero_ps = env.get_pageserver(shard_zero["node_id"])
|
||||
shard_zero_http = shard_zero_ps.http_client()
|
||||
shard_zero_timeline_info = shard_zero_http.timeline_detail(
|
||||
shard_zero["shard_id"], new_timeline_id
|
||||
)
|
||||
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
|
||||
min_readable_lsn = Lsn(shard_zero_timeline_info["min_readable_lsn"])
|
||||
last_record_lsn = Lsn(shard_zero_timeline_info["last_record_lsn"])
|
||||
disk_consistent_lsn = Lsn(shard_zero_timeline_info["disk_consistent_lsn"])
|
||||
_remote_consistent_lsn = Lsn(shard_zero_timeline_info["remote_consistent_lsn"])
|
||||
remote_consistent_lsn_visible = Lsn(shard_zero_timeline_info["remote_consistent_lsn_visible"])
|
||||
# assert remote_consistent_lsn_visible == remote_consistent_lsn TODO: this fails initially and after restart, presumably because `UploadQueue::clean.1` is still `None`
|
||||
# assert remote_consistent_lsn_visible == disk_consistent_lsn
|
||||
# assert initdb_lsn == min_readable_lsn
|
||||
# assert disk_consistent_lsn == initdb_lsn + 8
|
||||
assert last_record_lsn == disk_consistent_lsn
|
||||
# TODO: assert these values are the same everywhere
|
||||
|
||||
# Last step: validation
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name=branch_name,
|
||||
endpoint_id="ro",
|
||||
tenant_id=new_tenant_id,
|
||||
lsn=last_record_lsn,
|
||||
) as ro_endpoint:
|
||||
validate_data_equivalence(ro_endpoint)
|
||||
|
||||
# ensure the template survives restarts
|
||||
ro_endpoint.stop()
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
ro_endpoint.start()
|
||||
validate_data_equivalence(ro_endpoint)
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cca6f8083...55c0d45abe
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: daa81cffcf...de7640f55d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 15710a76b7...0bf96bd6d7
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: e5374b7299...8be779fd3a
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"e5374b72997b0afc8374137674e873f7a558120a"
|
||||
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"15710a76b7d07912110fcbbaf0c8ad6d7e5a9fbc"
|
||||
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"daa81cffcf063c54b29a9aabdb6604625f675ad0"
|
||||
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"4cca6f8083483dda9e12eae292cf788d45bd561f"
|
||||
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -60,7 +60,8 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nix = { version = "0.26" }
|
||||
nix-2f80eeee3b1b6c7e = { package = "nix", version = "0.26" }
|
||||
nix-fa1f6196edfd7249 = { package = "nix", version = "0.30", features = ["dir", "ioctl", "mman", "poll", "signal", "socket"] }
|
||||
nom = { version = "7" }
|
||||
num = { version = "0.4" }
|
||||
num-bigint = { version = "0.4" }
|
||||
|
||||
Reference in New Issue
Block a user