Compare commits

...

15 Commits

Author SHA1 Message Date
Conrad Ludgate
9041907019 rewrite with custom json serializer 2025-05-18 13:41:43 +02:00
Conrad Ludgate
53fdcd252f remove locking from extract, use refcell instead 2025-05-17 22:14:26 +02:00
Conrad Ludgate
f5c5b99b58 remove lasso from json logger, use field index for lookup 2025-05-17 22:14:26 +02:00
Conrad Ludgate
ac331090bf refactor json logging state 2025-05-17 22:14:26 +02:00
Conrad Ludgate
176b5a8978 replace indexset with lasso and linear search 2025-05-17 22:14:26 +02:00
Conrad Ludgate
e0da7dd8e9 use faster hasher than siphash 2025-05-17 22:14:26 +02:00
Conrad Ludgate
547fe38abf replace papaya with hashmap+lock. this assumes that spans are rarely accessed in parallel 2025-05-17 22:14:26 +02:00
Conrad Ludgate
c06f9635f5 remove tracing instrument on passthrough 2025-05-17 22:14:26 +02:00
Konstantin Knizhnik
8e05639dbf Invalidate LFC after unlogged build (#11951)
## Problem


See https://neondb.slack.com/archives/C04DGM6SMTM/p1747391617951239

LFC is not always properly updated during unlogged build so it can
contain stale content.

## Summary of changes

Invalidate LFC content at the end of unlogged build

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-17 19:06:59 +00:00
Alexander Bayandin
deed46015d CI(test-images): increase timeout from 20m to 60m (#11955)
## Problem

For some reason (unknown yet) 20m timeout is not enough for
`test-images` job on arm runners.
Ref:
https://github.com/neondatabase/neon/actions/runs/15075321681/job/42387530399?pr=11953

## Summary of changes
- Increase the timeout from 20m to 1h
2025-05-17 06:34:54 +00:00
Heikki Linnakangas
532d9b646e Add simple facility for an extendable shared memory area (#11929)
You still need to provide a max size up-front, but memory is only
allocated for the portion that is in use.

The module is currently unused, but will be used by the new compute
communicator project, in the neon Postgres extension. See
https://github.com/neondatabase/neon/issues/11729

---------

Co-authored-by: Erik Grinaker <erik@neon.tech>
2025-05-16 21:22:36 +00:00
Heikki Linnakangas
55f91cf10b Update 'nix' package (#11948)
There were some incompatible changes. Most churn was from switching from
the now-deprecated fcntl:flock() function to
fcntl::Flock::lock(). The new function returns a guard object, while
with the old function, the lock was associated directly with the file
descriptor.

It's good to stay up-to-date in general, but the impetus to do this now
is that in https://github.com/neondatabase/neon/pull/11929, I want to
use some functions that were added only in the latest version of 'nix',
and it's nice to not have to build multiple versions. (Although,
different versions of 'nix' are still pulled in as indirect dependencies
from other packages)
2025-05-16 14:45:08 +00:00
Folke Behrens
baafcc5d41 proxy: Fix misspelled flag value alias, swap names and aliases (#11949)
## Problem

There's a misspelled flag value alias that's not really used anywhere.

## Summary of changes

Fix the alias and make aliases the official flag values and keep old
values as aliases.
Also rename enum variant. No need for it to carry the version now.
2025-05-16 14:12:39 +00:00
Evan Fleming
aa22572d8c safekeeper: refactor static remote storage usage to use Arc (#10179)
Greetings! Please add `w=1` to github url when viewing diff
(sepcifically `wal_backup.rs`)

## Problem

This PR is aimed at addressing the remaining work of #8200. Namely,
removing static usage of remote storage in favour of arc. I did not opt
to pass `Arc<RemoteStorage>` directly since it is actually
`Optional<RemoteStorage>` as it is not necessarily always configured. I
wanted to avoid having to pass `Arc<Optional<RemoteStorage>>` everywhere
with individual consuming functions likely needing to handle unwrapping.

Instead I've added a `WalBackup` struct that holds
`Optional<RemoteStorage>` and handles initialization/unwrapping
RemoteStorage internally. wal_backup functions now take self and
`Arc<WalBackup>` is passed as a dependency through the various consumers
that need it.

## Summary of changes
- Add `WalBackup` that holds `Optional<RemoteStorage>` and handles
initialization and unwrapping
- Modify wal_backup functions to take `WalBackup` as self (Add `w=1` to
github url when viewing diff here)
- Initialize `WalBackup` in safekeeper root
- Store `Arc<WalBackup>` in `GlobalTimelineMap` and pass and store in
each Timeline as loaded
- use `WalBackup` through Timeline as needed

## Refs

- task to remove global variables
https://github.com/neondatabase/neon/issues/8200
- drive-by fixes https://github.com/neondatabase/neon/issues/11501 
by turning the panic reported there into an error `remote storage not
configured`

---------

Co-authored-by: Christian Schwarz <christian@neon.tech>
2025-05-16 12:41:10 +00:00
Arpad Müller
2d247375b3 Update rust to 1.87.0 (#11938)
We keep the practice of keeping the compiler up to date, pointing to the
latest release. This is done by many other projects in the Rust
ecosystem as well.

The 1.87.0 release marks 10 years of Rust.

[Announcement blog
post](https://blog.rust-lang.org/2025/05/15/Rust-1.87.0/)

Prior update was in #11431
2025-05-16 12:21:24 +00:00
41 changed files with 1428 additions and 590 deletions

View File

@@ -963,7 +963,7 @@ jobs:
fi
- name: Verify docker-compose example and test extensions
timeout-minutes: 20
timeout-minutes: 60
env:
TAG: >-
${{

37
Cargo.lock generated
View File

@@ -1112,6 +1112,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "cgroups-rs"
version = "0.3.3"
@@ -1306,7 +1312,7 @@ dependencies = [
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"nix 0.30.1",
"notify",
"num_cpus",
"once_cell",
@@ -1429,7 +1435,7 @@ dependencies = [
"humantime-serde",
"hyper 0.14.30",
"jsonwebtoken",
"nix 0.27.1",
"nix 0.30.1",
"once_cell",
"pageserver_api",
"pageserver_client",
@@ -3512,9 +3518,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.169"
version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libloading"
@@ -3788,6 +3794,16 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "neon-shmem"
version = "0.1.0"
dependencies = [
"nix 0.30.1",
"tempfile",
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "never-say-never"
version = "6.6.666"
@@ -3821,12 +3837,13 @@ dependencies = [
[[package]]
name = "nix"
version = "0.27.1"
version = "0.30.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053"
checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6"
dependencies = [
"bitflags 2.8.0",
"cfg-if",
"cfg_aliases",
"libc",
"memoffset 0.9.0",
]
@@ -4280,7 +4297,7 @@ dependencies = [
"jsonwebtoken",
"md5",
"metrics",
"nix 0.27.1",
"nix 0.30.1",
"num-traits",
"num_cpus",
"once_cell",
@@ -4356,7 +4373,7 @@ dependencies = [
"humantime",
"humantime-serde",
"itertools 0.10.5",
"nix 0.27.1",
"nix 0.30.1",
"once_cell",
"postgres_backend",
"postgres_ffi",
@@ -5188,6 +5205,7 @@ dependencies = [
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
"ryu",
"scopeguard",
"serde",
"serde_json",
@@ -7899,7 +7917,7 @@ dependencies = [
"humantime",
"jsonwebtoken",
"metrics",
"nix 0.27.1",
"nix 0.30.1",
"once_cell",
"pem",
"pin-project-lite",
@@ -8475,6 +8493,7 @@ dependencies = [
"log",
"memchr",
"nix 0.26.4",
"nix 0.30.1",
"nom",
"num",
"num-bigint",

View File

@@ -23,6 +23,7 @@ members = [
"libs/postgres_ffi",
"libs/safekeeper_api",
"libs/desim",
"libs/neon-shmem",
"libs/utils",
"libs/consumption_metrics",
"libs/postgres_backend",
@@ -127,7 +128,7 @@ md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
memoffset = "0.9"
nix = { version = "0.27", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
# Do not update to >= 7.0.0, at least. The update will have a significant impact
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
notify = "6.0.0"

View File

@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.86.0
ENV RUSTC_VERSION=1.87.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1

View File

@@ -14,7 +14,7 @@
use std::ffi::OsStr;
use std::io::Write;
use std::os::unix::prelude::AsRawFd;
use std::os::fd::AsFd;
use std::os::unix::process::CommandExt;
use std::path::Path;
use std::process::Command;
@@ -356,7 +356,7 @@ where
let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
// Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
// remains locked after exec.
nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
nix::fcntl::fcntl(file.as_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
.expect("remove FD_CLOEXEC");
// Don't run drop(file), it would close the file before we actually exec.
std::mem::forget(file);

View File

@@ -8,7 +8,6 @@
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -31,7 +30,7 @@ use control_plane::safekeeper::SafekeeperNode;
use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use nix::fcntl::{FlockArg, flock};
use nix::fcntl::{Flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -749,16 +748,16 @@ struct TimelineTreeEl {
/// A flock-based guard over the neon_local repository directory
struct RepoLock {
_file: File,
_file: Flock<File>,
}
impl RepoLock {
fn new() -> Result<Self> {
let repo_dir = File::open(local_env::base_path())?;
let repo_dir_fd = repo_dir.as_raw_fd();
flock(repo_dir_fd, FlockArg::LockExclusive)?;
Ok(Self { _file: repo_dir })
match Flock::lock(repo_dir, FlockArg::LockExclusive) {
Ok(f) => Ok(Self { _file: f }),
Err((_, e)) => Err(e).context("flock error"),
}
}
}

View File

@@ -0,0 +1,13 @@
[package]
name = "neon-shmem"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
thiserror.workspace = true
nix.workspace=true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[target.'cfg(target_os = "macos")'.dependencies]
tempfile = "3.14.0"

418
libs/neon-shmem/src/lib.rs Normal file
View File

@@ -0,0 +1,418 @@
//! Shared memory utilities for neon communicator
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use nix::errno::Errno;
use nix::sys::mman::MapFlags;
use nix::sys::mman::ProtFlags;
use nix::sys::mman::mmap as nix_mmap;
use nix::sys::mman::munmap as nix_munmap;
use nix::unistd::ftruncate as nix_ftruncate;
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
/// specified at creation.
///
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
/// future.
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
max_size: usize,
// Pointer to the beginning of the shared memory area. The header is stored there.
shared_ptr: NonNull<SharedStruct>,
// Pointer to the beginning of the user data
pub data_ptr: NonNull<u8>,
}
/// This is stored at the beginning in the shared memory area.
struct SharedStruct {
max_size: usize,
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
current_size: AtomicUsize,
}
const RESIZE_IN_PROGRESS: usize = 1 << 63;
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
/// Error type returned by the ShmemHandle functions.
#[derive(thiserror::Error, Debug)]
#[error("{msg}: {errno}")]
pub struct Error {
pub msg: String,
pub errno: Errno,
}
impl Error {
fn new(msg: &str, errno: Errno) -> Error {
Error {
msg: msg.to_string(),
errno,
}
}
}
impl ShmemHandle {
/// Create a new shared memory area. To communicate between processes, the processes need to be
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
///
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
/// processes can continue using it, however.
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
// create the backing anonymous file.
let fd = create_backing_file(name)?;
Self::new_with_fd(fd, initial_size, max_size)
}
fn new_with_fd(
fd: OwnedFd,
initial_size: usize,
max_size: usize,
) -> Result<ShmemHandle, Error> {
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
// is a little larger than this because of the SharedStruct header. Make the upper limit
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
}
// The actual initial / max size is the one given by the caller, plus the size of
// 'SharedStruct'.
let initial_size = HEADER_SIZE + initial_size;
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
// Reserve address space for it with mmap
//
// TODO: Use MAP_HUGETLB if possible
let start_ptr = unsafe {
nix_mmap(
None,
max_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)
}
.map_err(|e| Error::new("mmap failed: {e}", e))?;
// Reserve space for the initial size
enlarge_file(fd.as_fd(), initial_size as u64)?;
// Initialize the header
let shared: NonNull<SharedStruct> = start_ptr.cast();
unsafe {
shared.write(SharedStruct {
max_size: max_size.into(),
current_size: AtomicUsize::new(initial_size),
})
};
// The user data begins after the header
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
Ok(ShmemHandle {
fd,
max_size: max_size.into(),
shared_ptr: shared,
data_ptr,
})
}
// return reference to the header
fn shared(&self) -> &SharedStruct {
unsafe { self.shared_ptr.as_ref() }
}
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
/// when creating the area.
///
/// This may only be called from one process/thread concurrently. We detect that case
/// and return an Error.
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
let new_size = new_size + HEADER_SIZE;
let shared = self.shared();
if new_size > self.max_size {
panic!(
"new size ({} is greater than max size ({})",
new_size, self.max_size
);
}
assert_eq!(self.max_size, shared.max_size);
// Lock the area by setting the bit in 'current_size'
//
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
// since this is not performance-critical, better safe than sorry .
let mut old_size = shared.current_size.load(Ordering::Acquire);
loop {
if (old_size & RESIZE_IN_PROGRESS) != 0 {
return Err(Error::new(
"concurrent resize detected",
Errno::UnknownErrno,
));
}
match shared.current_size.compare_exchange(
old_size,
new_size,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_size = x,
}
}
// Ok, we got the lock.
//
// NB: If anything goes wrong, we *must* clear the bit!
let result = {
use std::cmp::Ordering::{Equal, Greater, Less};
match new_size.cmp(&old_size) {
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
}),
Equal => Ok(()),
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
}
};
// Unlock
shared.current_size.store(
if result.is_ok() { new_size } else { old_size },
Ordering::Release,
);
result
}
/// Returns the current user-visible size of the shared memory segment.
///
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
/// responsibility not to access the area beyond the current size.
pub fn current_size(&self) -> usize {
let total_current_size =
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
total_current_size - HEADER_SIZE
}
}
impl Drop for ShmemHandle {
fn drop(&mut self) {
// SAFETY: The pointer was obtained from mmap() with the given size.
// We unmap the entire region.
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
// The fd is dropped automatically by OwnedFd.
}
}
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
/// development and testing, but in production we want the file to stay in memory.
///
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
#[allow(unused_variables)]
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
#[cfg(not(target_os = "macos"))]
{
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
.map_err(|e| Error::new("memfd_create failed: {e}", e))
}
#[cfg(target_os = "macos")]
{
let file = tempfile::tempfile().map_err(|e| {
Error::new(
"could not create temporary file to back shmem area: {e}",
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
)
})?;
Ok(OwnedFd::from(file))
}
}
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
// we don't get a segfault later when trying to actually use it.
#[cfg(not(target_os = "macos"))]
{
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
Error::new(
"could not grow shmem segment, posix_fallocate failed: {e}",
e,
)
})
}
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
#[cfg(target_os = "macos")]
{
nix::unistd::ftruncate(fd, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::unistd::ForkResult;
use std::ops::Range;
/// check that all bytes in given range have the expected value.
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
}
}
/// Write 'b' to all bytes in the given range
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
}
// simple single-process test of growing and shrinking
#[test]
fn test_shmem_resize() -> Result<(), Error> {
let max_size = 1024 * 1024;
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
assert_eq!(init_struct.current_size(), 0);
// Initial grow
let size1 = 10000;
init_struct.set_size(size1).unwrap();
assert_eq!(init_struct.current_size(), size1);
// Write some data
let data_ptr = init_struct.data_ptr.as_ptr();
write_range(data_ptr, 0xAA, 0..size1);
assert_range(data_ptr, 0xAA, 0..size1);
// Shrink
let size2 = 5000;
init_struct.set_size(size2).unwrap();
assert_eq!(init_struct.current_size(), size2);
// Grow again
let size3 = 20000;
init_struct.set_size(size3).unwrap();
assert_eq!(init_struct.current_size(), size3);
// Try to read it. The area that was shrunk and grown again should read as all zeros now
assert_range(data_ptr, 0xAA, 0..5000);
assert_range(data_ptr, 0, 5000..size1);
// Try to grow beyond max_size
//let size4 = max_size + 1;
//assert!(init_struct.set_size(size4).is_err());
// Dropping init_struct should unmap the memory
drop(init_struct);
Ok(())
}
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
/// but is stored in the shared memory area and works across processes. It's implemented by
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
struct SimpleBarrier {
num_procs: usize,
count: AtomicUsize,
}
impl SimpleBarrier {
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
unsafe {
*ptr = SimpleBarrier {
num_procs,
count: AtomicUsize::new(0),
}
}
}
pub fn wait(&self) {
let old = self.count.fetch_add(1, Ordering::Relaxed);
let generation = old / self.num_procs;
let mut current = old + 1;
while current < (generation + 1) * self.num_procs {
std::thread::sleep(std::time::Duration::from_millis(10));
current = self.count.load(Ordering::Relaxed);
}
}
}
#[test]
fn test_multi_process() {
// Initialize
let max_size = 1_000_000_000_000;
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
let ptr = init_struct.data_ptr.as_ptr();
// Store the SimpleBarrier in the first 1k of the area.
init_struct.set_size(10000).unwrap();
let barrier_ptr: *mut SimpleBarrier = unsafe {
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
.cast()
};
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
// Fork another test process. The code after this runs in both processes concurrently.
let fork_result = unsafe { nix::unistd::fork().unwrap() };
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
if fork_result.is_parent() {
write_range(ptr, 0xAA, 1000..2000);
} else {
write_range(ptr, 0xBB, 2000..3000);
}
barrier.wait();
// Verify the contents. (in both processes)
assert_range(ptr, 0xAA, 1000..2000);
assert_range(ptr, 0xBB, 2000..3000);
// Grow, from the child this time
let size = 10_000_000;
if !fork_result.is_parent() {
init_struct.set_size(size).unwrap();
}
barrier.wait();
// make some writes at the end
if fork_result.is_parent() {
write_range(ptr, 0xAA, (size - 10)..size);
} else {
write_range(ptr, 0xBB, (size - 20)..(size - 10));
}
barrier.wait();
// Verify the contents. (This runs in both processes)
assert_range(ptr, 0, (size - 1000)..(size - 20));
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
assert_range(ptr, 0xAA, (size - 10)..size);
if let ForkResult::Parent { child } = fork_result {
nix::sys::wait::waitpid(child, None).unwrap();
}
}
}

View File

@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::fs::{self, File};
use std::io::{self, Write};
use std::os::fd::AsRawFd;
use std::os::fd::AsFd;
use camino::{Utf8Path, Utf8PathBuf};
@@ -210,13 +210,13 @@ pub fn overwrite(
/// Syncs the filesystem for the given file descriptor.
#[cfg_attr(target_os = "macos", allow(unused_variables))]
pub fn syncfs(fd: impl AsRawFd) -> anyhow::Result<()> {
pub fn syncfs(fd: impl AsFd) -> anyhow::Result<()> {
// Linux guarantees durability for syncfs.
// POSIX doesn't have syncfs, and further does not actually guarantee durability of sync().
#[cfg(target_os = "linux")]
{
use anyhow::Context;
nix::unistd::syncfs(fd.as_raw_fd()).context("syncfs")?;
nix::unistd::syncfs(fd).context("syncfs")?;
}
#[cfg(target_os = "macos")]
{

View File

@@ -11,9 +11,9 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
#[cfg(all(target_os = "linux", target_env = "gnu"))]
{
nix::fcntl::renameat2(
None,
nix::fcntl::AT_FDCWD,
src,
None,
nix::fcntl::AT_FDCWD,
dst,
nix::fcntl::RenameFlags::RENAME_NOREPLACE,
)

View File

@@ -1,6 +1,6 @@
//! A module to create and read lock files.
//!
//! File locking is done using [`fcntl::flock`] exclusive locks.
//! File locking is done using [`nix::fcntl::Flock`] exclusive locks.
//! The only consumer of this module is currently
//! [`pid_file`](crate::pid_file). See the module-level comment
//! there for potential pitfalls with lock files that are used
@@ -9,26 +9,25 @@
use std::fs;
use std::io::{Read, Write};
use std::ops::Deref;
use std::os::unix::prelude::AsRawFd;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno::EAGAIN;
use nix::fcntl;
use nix::fcntl::{Flock, FlockArg};
use crate::crashsafe;
/// A handle to an open and unlocked, but not-yet-written lock file.
/// A handle to an open and flocked, but not-yet-written lock file.
/// Returned by [`create_exclusive`].
#[must_use]
pub struct UnwrittenLockFile {
path: Utf8PathBuf,
file: fs::File,
file: Flock<fs::File>,
}
/// Returned by [`UnwrittenLockFile::write_content`].
#[must_use]
pub struct LockFileGuard(fs::File);
pub struct LockFileGuard(Flock<fs::File>);
impl Deref for LockFileGuard {
type Target = fs::File;
@@ -67,17 +66,14 @@ pub fn create_exclusive(lock_file_path: &Utf8Path) -> anyhow::Result<UnwrittenLo
.open(lock_file_path)
.context("open lock file")?;
let res = fcntl::flock(
lock_file.as_raw_fd(),
fcntl::FlockArg::LockExclusiveNonblock,
);
let res = Flock::lock(lock_file, FlockArg::LockExclusiveNonblock);
match res {
Ok(()) => Ok(UnwrittenLockFile {
Ok(lock_file) => Ok(UnwrittenLockFile {
path: lock_file_path.to_owned(),
file: lock_file,
}),
Err(EAGAIN) => anyhow::bail!("file is already locked"),
Err(e) => Err(e).context("flock error"),
Err((_, EAGAIN)) => anyhow::bail!("file is already locked"),
Err((_, e)) => Err(e).context("flock error"),
}
}
@@ -105,32 +101,37 @@ pub enum LockFileRead {
/// Check the [`LockFileRead`] variants for details.
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
let res = fs::OpenOptions::new().read(true).open(path);
let mut lock_file = match res {
let lock_file = match res {
Ok(f) => f,
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => return Ok(LockFileRead::NotExist),
_ => return Err(e).context("open lock file"),
},
};
let res = fcntl::flock(
lock_file.as_raw_fd(),
fcntl::FlockArg::LockExclusiveNonblock,
);
let res = Flock::lock(lock_file, FlockArg::LockExclusiveNonblock);
// We need the content regardless of lock success / failure.
// But, read it after flock so that, if it succeeded, the content is consistent.
let mut content = String::new();
lock_file
.read_to_string(&mut content)
.context("read lock file")?;
match res {
Ok(()) => Ok(LockFileRead::NotHeldByAnyProcess(
LockFileGuard(lock_file),
content,
)),
Err(EAGAIN) => Ok(LockFileRead::LockedByOtherProcess {
not_locked_file: lock_file,
content,
}),
Err(e) => Err(e).context("flock error"),
Ok(mut locked_file) => {
let mut content = String::new();
locked_file
.read_to_string(&mut content)
.context("read lock file")?;
Ok(LockFileRead::NotHeldByAnyProcess(
LockFileGuard(locked_file),
content,
))
}
Err((mut not_locked_file, EAGAIN)) => {
let mut content = String::new();
not_locked_file
.read_to_string(&mut content)
.context("read lock file")?;
Ok(LockFileRead::LockedByOtherProcess {
not_locked_file,
content,
})
}
Err((_, e)) => Err(e).context("flock error"),
}
}

View File

@@ -668,7 +668,9 @@ impl From<DownloadError> for UpdateError {
impl From<std::io::Error> for UpdateError {
fn from(value: std::io::Error) -> Self {
if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
if let Some(nix::errno::Errno::ENOSPC) =
value.raw_os_error().map(nix::errno::Errno::from_raw)
{
UpdateError::NoSpace
} else if value
.get_ref()

View File

@@ -408,7 +408,7 @@ impl OpenFiles {
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
match e.raw_os_error().map(nix::errno::Errno::from_raw) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.

View File

@@ -124,9 +124,7 @@ pub(super) fn epoll_uring_error_to_std(
) -> std::io::Error {
match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
tokio_epoll_uring::Error::System(system) => std::io::Error::other(system),
}
}

View File

@@ -936,6 +936,44 @@ lfc_prewarm_main(Datum main_arg)
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
void
lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
{
BufferTag tag;
FileCacheEntry *entry;
uint32 hash;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
for (BlockNumber blkno = 0; blkno < nblocks; blkno += lfc_blocks_per_chunk)
{
tag.blockNum = blkno;
hash = get_hash_value(lfc_hash, &tag);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
lfc_ctl->used_pages -= 1;
SET_STATE(entry, i, UNAVAILABLE);
}
}
}
}
}
LWLockRelease(lfc_lock);
}
/*
* Check if page is present in the cache.

View File

@@ -28,6 +28,7 @@ typedef struct FileCacheState
extern bool lfc_store_prefetch_result;
/* functions for local file cache */
extern void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks);
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);

View File

@@ -919,9 +919,6 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdextend(reln, forkNum, blkno, buffer, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_write(InfoFromSMgrRel(reln), forkNum, blkno, buffer);
return;
default:
@@ -1010,14 +1007,6 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
{
for (int i = 0; i < nblocks; i++)
{
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
}
}
return;
default:
@@ -1617,9 +1606,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
@@ -1685,9 +1671,6 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
@@ -2083,6 +2066,8 @@ neon_end_unlogged_build(SMgrRelation reln)
forknum);
forget_cached_relsize(InfoFromNInfoB(rinfob), forknum);
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
mdclose(reln, forknum);
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */

View File

@@ -73,6 +73,7 @@ rustc-hash.workspace = true
rustls.workspace = true
rustls-native-certs.workspace = true
rustls-pemfile.workspace = true
ryu = "1"
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -394,6 +394,7 @@ async fn handle_client(
}
}
#[allow(clippy::large_enum_variant)]
enum Connection {
Raw(tokio::net::TcpStream),
Tls(tokio_rustls::client::TlsStream<tokio::net::TcpStream>),

View File

@@ -43,11 +43,12 @@ project_build_tag!(BUILD_TAG);
use clap::{Parser, ValueEnum};
#[derive(Clone, Debug, ValueEnum)]
#[clap(rename_all = "kebab-case")]
enum AuthBackendType {
#[value(name("cplane-v1"), alias("control-plane"))]
ControlPlaneV1,
#[clap(alias("cplane-v1"))]
ControlPlane,
#[value(name("link"), alias("control-redirect"))]
#[clap(alias("link"))]
ConsoleRedirect,
#[cfg(any(test, feature = "testing"))]
@@ -707,7 +708,7 @@ fn build_auth_backend(
args: &ProxyCliArgs,
) -> anyhow::Result<Either<&'static auth::Backend<'static, ()>, &'static ConsoleRedirectBackend>> {
match &args.auth_backend {
AuthBackendType::ControlPlaneV1 => {
AuthBackendType::ControlPlane => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
args.project_info_cache.parse()?;
@@ -862,7 +863,7 @@ async fn configure_redis(
("irsa", _) => match (&args.redis_host, args.redis_port) {
(Some(host), Some(port)) => Some(
ConnectionWithCredentialsProvider::new_with_credentials_provider(
host.to_string(),
host.clone(),
port,
elasticache::CredentialsProvider::new(
args.aws_region.clone(),

View File

@@ -78,7 +78,7 @@ struct RequestContextInner {
#[derive(Clone, Debug)]
pub(crate) enum AuthMethod {
// aka passwordless, fka link
// aka link
ConsoleRedirect,
ScramSha256,
ScramSha256Plus,

356
proxy/src/logging/json.rs Normal file
View File

@@ -0,0 +1,356 @@
//! Vendoring of serde_json's string escaping code.
//!
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L1514-L1552>
//! <https://github.com/serde-rs/json/blob/c1826ebcccb1a520389c6b78ad3da15db279220d/src/ser.rs#L2081-L2157>
//! Licensed by David Tolnay under MIT or Apache-2.0.
//!
//! With modifications by Conrad Ludgate on behalf of Neon.
use std::fmt::{self, Write};
use serde_json::ser::CharEscape;
#[must_use]
pub struct ValueSer<'buf> {
buf: &'buf mut Vec<u8>,
}
impl<'buf> ValueSer<'buf> {
pub fn new(buf: &'buf mut Vec<u8>) -> Self {
Self { buf }
}
#[inline]
pub fn serialize(self, value: &SerializedValue) {
self.buf.extend_from_slice(&value.0);
}
#[inline]
pub fn str(self, s: &str) {
format_escaped_str(self.buf, s);
}
#[inline]
pub fn str_args(self, s: fmt::Arguments) {
format_escaped_display(self.buf, s);
}
#[inline]
pub fn bytes_hex(self, s: &[u8]) {
self.str_args(format_args!("{s:x?}"));
}
#[inline]
pub fn int(self, x: impl itoa::Integer) {
write_int(x, self.buf);
}
#[inline]
pub fn float(self, x: impl ryu::Float) {
write_float(x, self.buf);
}
#[inline]
pub fn bool(self, x: bool) {
let bool = if x { "true" } else { "false" };
self.buf.extend_from_slice(bool.as_bytes());
}
#[inline]
pub fn map(self) -> MapSer<'buf> {
MapSer::new(self.buf)
}
#[inline]
#[expect(unused)]
pub fn list(self) -> ListSer<'buf> {
ListSer::new(self.buf)
}
}
pub struct MapSer<'buf> {
buf: &'buf mut Vec<u8>,
first: bool,
}
impl<'buf> MapSer<'buf> {
#[inline]
fn new(buf: &'buf mut Vec<u8>) -> Self {
buf.push(b'{');
Self { buf, first: true }
}
#[inline]
pub fn entry(&mut self, key: Escaped) -> ValueSer {
self.entry_inner(|b| key.write(b))
}
#[inline]
pub fn entry_escape(&mut self, key: &str) -> ValueSer {
self.entry_inner(|b| format_escaped_str(b, key))
}
#[inline]
pub fn entry_escape_args(&mut self, key: fmt::Arguments) -> ValueSer {
self.entry_inner(|b| format_escaped_display(b, key))
}
#[inline]
fn entry_inner(&mut self, f: impl FnOnce(&mut Vec<u8>)) -> ValueSer {
if !self.first {
self.buf.push(b',');
}
self.first = false;
f(self.buf);
self.buf.push(b':');
ValueSer { buf: self.buf }
}
}
impl Drop for MapSer<'_> {
fn drop(&mut self) {
self.buf.push(b'}');
}
}
pub struct ListSer<'buf> {
buf: &'buf mut Vec<u8>,
first: bool,
}
impl<'buf> ListSer<'buf> {
#[inline]
fn new(buf: &'buf mut Vec<u8>) -> Self {
buf.push(b'[');
Self { buf, first: true }
}
#[expect(unused)]
#[inline]
fn entry(&mut self) -> ValueSer {
if !self.first {
self.buf.push(b',');
}
self.first = false;
ValueSer { buf: self.buf }
}
}
impl Drop for ListSer<'_> {
fn drop(&mut self) {
self.buf.push(b']');
}
}
#[derive(Clone)]
pub struct SerializedValue(Box<[u8]>);
impl SerializedValue {
#[inline]
pub fn str(s: &str) -> Self {
let mut v = vec![];
v.reserve_exact(2 + s.len());
format_escaped_str(&mut v, s);
Self(v.into_boxed_slice())
}
#[inline]
pub fn str_args(s: fmt::Arguments) -> Self {
if let Some(s) = s.as_str() {
return Self::str(s);
}
let mut v = vec![];
format_escaped_display(&mut v, s);
Self(v.into_boxed_slice())
}
#[inline]
pub fn bytes_hex(s: &[u8]) -> Self {
Self::str_args(format_args!("{s:x?}"))
}
#[inline]
pub fn int(x: impl itoa::Integer) -> Self {
Self(itoa::Buffer::new().format(x).as_bytes().into())
}
#[inline]
pub fn float(x: impl ryu::Float) -> Self {
Self(ryu::Buffer::new().format(x).as_bytes().into())
}
#[inline]
pub fn bool(x: bool) -> Self {
let bool = if x { "true" } else { "false" };
Self(bool.as_bytes().into())
}
}
/// Represents a string that didn't need escaping because it's already valid json string.
#[derive(Clone, Copy)]
pub struct Escaped(&'static str);
impl Escaped {
pub const fn new(s: &'static str) -> Self {
let mut i = 0;
while i < s.len() {
let escape = ESCAPE[s.as_bytes()[i] as usize];
i += 1;
assert!(escape == 0, "const json string should not need escaping");
}
Self(s)
}
pub fn as_str(self) -> &'static str {
self.0
}
fn write(self, buf: &mut Vec<u8>) {
buf.push(b'"');
buf.extend_from_slice(self.0.as_bytes());
buf.push(b'"');
}
}
fn write_int(x: impl itoa::Integer, b: &mut Vec<u8>) {
b.extend_from_slice(itoa::Buffer::new().format(x).as_bytes());
}
fn write_float(x: impl ryu::Float, b: &mut Vec<u8>) {
b.extend_from_slice(ryu::Buffer::new().format(x).as_bytes());
}
#[inline]
fn char_escape_from_escape_table(escape: u8, byte: u8) -> CharEscape {
match escape {
self::BB => CharEscape::Backspace,
self::TT => CharEscape::Tab,
self::NN => CharEscape::LineFeed,
self::FF => CharEscape::FormFeed,
self::RR => CharEscape::CarriageReturn,
self::QU => CharEscape::Quote,
self::BS => CharEscape::ReverseSolidus,
self::UU => CharEscape::AsciiControl(byte),
_ => unreachable!(),
}
}
fn format_escaped_str(writer: &mut Vec<u8>, value: &str) {
writer.push(b'"');
let rest = format_escaped_str_contents(writer, value);
writer.extend_from_slice(rest);
writer.push(b'"');
}
fn format_escaped_display(writer: &mut Vec<u8>, args: fmt::Arguments) {
writer.push(b'"');
if let Some(s) = args.as_str() {
let rest = format_escaped_str_contents(writer, s);
writer.extend_from_slice(rest);
} else {
Collect { buf: writer }
.write_fmt(args)
.expect("formatting should not error");
}
writer.push(b'"');
}
struct Collect<'buf> {
buf: &'buf mut Vec<u8>,
}
impl fmt::Write for Collect<'_> {
fn write_str(&mut self, s: &str) -> fmt::Result {
let last = format_escaped_str_contents(self.buf, s);
self.buf.extend(last);
Ok(())
}
}
// writes any escape sequences, and returns the suffix still needed to be written.
fn format_escaped_str_contents<'a>(writer: &mut Vec<u8>, value: &'a str) -> &'a [u8] {
let bytes = value.as_bytes();
let mut start = 0;
for (i, &byte) in bytes.iter().enumerate() {
let escape = ESCAPE[byte as usize];
if escape == 0 {
continue;
}
writer.extend_from_slice(&bytes[start..i]);
let char_escape = char_escape_from_escape_table(escape, byte);
write_char_escape(writer, char_escape);
start = i + 1;
}
&bytes[start..]
}
const BB: u8 = b'b'; // \x08
const TT: u8 = b't'; // \x09
const NN: u8 = b'n'; // \x0A
const FF: u8 = b'f'; // \x0C
const RR: u8 = b'r'; // \x0D
const QU: u8 = b'"'; // \x22
const BS: u8 = b'\\'; // \x5C
const UU: u8 = b'u'; // \x00...\x1F except the ones above
const __: u8 = 0;
// Lookup table of escape sequences. A value of b'x' at index i means that byte
// i is escaped as "\x" in JSON. A value of 0 means that byte i is not escaped.
static ESCAPE: [u8; 256] = [
// 1 2 3 4 5 6 7 8 9 A B C D E F
UU, UU, UU, UU, UU, UU, UU, UU, BB, TT, NN, UU, FF, RR, UU, UU, // 0
UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, UU, // 1
__, __, QU, __, __, __, __, __, __, __, __, __, __, __, __, __, // 2
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 3
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 4
__, __, __, __, __, __, __, __, __, __, __, __, BS, __, __, __, // 5
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 6
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 7
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 8
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // 9
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // A
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // B
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // C
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // D
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // E
__, __, __, __, __, __, __, __, __, __, __, __, __, __, __, __, // F
];
fn write_char_escape(writer: &mut Vec<u8>, char_escape: CharEscape) {
let s = match char_escape {
CharEscape::Quote => b"\\\"",
CharEscape::ReverseSolidus => b"\\\\",
CharEscape::Solidus => b"\\/",
CharEscape::Backspace => b"\\b",
CharEscape::FormFeed => b"\\f",
CharEscape::LineFeed => b"\\n",
CharEscape::CarriageReturn => b"\\r",
CharEscape::Tab => b"\\t",
CharEscape::AsciiControl(byte) => {
static HEX_DIGITS: [u8; 16] = *b"0123456789abcdef";
let bytes = &[
b'\\',
b'u',
b'0',
b'0',
HEX_DIGITS[(byte >> 4) as usize],
HEX_DIGITS[(byte & 0xF) as usize],
];
return writer.extend_from_slice(bytes);
}
};
writer.extend_from_slice(s);
}

File diff suppressed because it is too large Load Diff

View File

@@ -67,7 +67,6 @@ where
}
}
#[tracing::instrument(skip_all)]
pub async fn copy_bidirectional_client_compute<Client, Compute>(
client: &mut Client,
compute: &mut Compute,

View File

@@ -13,7 +13,6 @@ use crate::stream::Stream;
use crate::usage_metrics::{Ids, MetricCounterRecorder, USAGE_METRICS};
/// Forward bytes in both directions (client <-> compute).
#[tracing::instrument(skip_all)]
pub(crate) async fn proxy_pass(
client: impl AsyncRead + AsyncWrite + Unpin,
compute: impl AsyncRead + AsyncWrite + Unpin,

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.86.0"
channel = "1.87.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -22,9 +22,10 @@ use safekeeper::defaults::{
DEFAULT_PARTIAL_BACKUP_TIMEOUT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_SSL_CERT_FILE,
DEFAULT_SSL_CERT_RELOAD_PERIOD, DEFAULT_SSL_KEY_FILE,
};
use safekeeper::wal_backup::WalBackup;
use safekeeper::{
BACKGROUND_RUNTIME, BROKER_RUNTIME, GlobalTimelines, HTTP_RUNTIME, SafeKeeperConf,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_backup, wal_service,
WAL_SERVICE_RUNTIME, broker, control_file, http, wal_service,
};
use sd_notify::NotifyState;
use storage_broker::{DEFAULT_ENDPOINT, Uri};
@@ -484,15 +485,15 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
None => None,
};
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone()));
let wal_backup = Arc::new(WalBackup::new(&conf).await?);
let global_timelines = Arc::new(GlobalTimelines::new(conf.clone(), wal_backup.clone()));
// Register metrics collector for active timelines. It's important to do this
// after daemonizing, otherwise process collector will be upset.
let timeline_collector = safekeeper::metrics::TimelineCollector::new(global_timelines.clone());
metrics::register_internal(Box::new(timeline_collector))?;
wal_backup::init_remote_storage(&conf).await;
// Keep handles to main tasks to die if any of them disappears.
let mut tasks_handles: FuturesUnordered<BoxFuture<(String, JoinTaskRes)>> =
FuturesUnordered::new();

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use anyhow::{Result, bail};
use camino::Utf8PathBuf;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use remote_storage::GenericRemoteStorage;
use safekeeper_api::membership::Configuration;
use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
@@ -30,6 +31,7 @@ pub struct Request {
pub async fn handle_request(
request: Request,
global_timelines: Arc<GlobalTimelines>,
storage: Arc<GenericRemoteStorage>,
) -> Result<()> {
// TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
// if LSN will point to the middle of a WAL record, timeline will be in "broken" state
@@ -127,6 +129,7 @@ pub async fn handle_request(
assert!(first_ondisk_segment >= first_segment);
copy_s3_segments(
&storage,
wal_seg_size,
&request.source_ttid,
&request.destination_ttid,

View File

@@ -258,6 +258,7 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
let storage = global_timelines.get_wal_backup().get_storage();
// To stream the body use wrap_stream which wants Stream of Result<Bytes>,
// so create the chan and write to it in another task.
@@ -269,6 +270,7 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
conf.my_id,
destination,
tx,
storage,
));
let rx_stream = ReceiverStream::new(rx);
@@ -390,12 +392,18 @@ async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Bo
);
let global_timelines = get_global_timelines(&request);
let wal_backup = global_timelines.get_wal_backup();
let storage = wal_backup
.get_storage()
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"Remote Storage is not configured"
)))?;
copy_timeline::handle_request(copy_timeline::Request{
source_ttid,
until_lsn: request_data.until_lsn,
destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
}, global_timelines)
}, global_timelines, storage)
.instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
.await
.map_err(ApiError::InternalServerError)?;

View File

@@ -125,12 +125,6 @@ pub struct SafeKeeperConf {
pub enable_tls_wal_service_api: bool,
}
impl SafeKeeperConf {
pub fn is_wal_backup_enabled(&self) -> bool {
self.remote_storage.is_some() && self.wal_backup_enabled
}
}
impl SafeKeeperConf {
pub fn dummy() -> Self {
SafeKeeperConf {

View File

@@ -9,6 +9,7 @@ use chrono::{DateTime, Utc};
use futures::{SinkExt, StreamExt, TryStreamExt};
use http_utils::error::ApiError;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::GenericRemoteStorage;
use reqwest::Certificate;
use safekeeper_api::Term;
use safekeeper_api::models::{PullTimelineRequest, PullTimelineResponse, TimelineStatus};
@@ -43,6 +44,7 @@ pub async fn stream_snapshot(
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
storage: Option<Arc<GenericRemoteStorage>>,
) {
match tli.try_wal_residence_guard().await {
Err(e) => {
@@ -53,10 +55,32 @@ pub async fn stream_snapshot(
Ok(maybe_resident_tli) => {
if let Err(e) = match maybe_resident_tli {
Some(resident_tli) => {
stream_snapshot_resident_guts(resident_tli, source, destination, tx.clone())
.await
stream_snapshot_resident_guts(
resident_tli,
source,
destination,
tx.clone(),
storage,
)
.await
}
None => {
if let Some(storage) = storage {
stream_snapshot_offloaded_guts(
tli,
source,
destination,
tx.clone(),
&storage,
)
.await
} else {
tx.send(Err(anyhow!("remote storage not configured")))
.await
.ok();
return;
}
}
None => stream_snapshot_offloaded_guts(tli, source, destination, tx.clone()).await,
} {
// Error type/contents don't matter as they won't can't reach the client
// (hyper likely doesn't do anything with it), but http stream will be
@@ -123,10 +147,12 @@ pub(crate) async fn stream_snapshot_offloaded_guts(
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
storage: &GenericRemoteStorage,
) -> Result<()> {
let mut ar = prepare_tar_stream(tx);
tli.snapshot_offloaded(&mut ar, source, destination).await?;
tli.snapshot_offloaded(&mut ar, source, destination, storage)
.await?;
ar.finish().await?;
@@ -139,10 +165,13 @@ pub async fn stream_snapshot_resident_guts(
source: NodeId,
destination: NodeId,
tx: mpsc::Sender<Result<Bytes>>,
storage: Option<Arc<GenericRemoteStorage>>,
) -> Result<()> {
let mut ar = prepare_tar_stream(tx);
let bctx = tli.start_snapshot(&mut ar, source, destination).await?;
let bctx = tli
.start_snapshot(&mut ar, source, destination, storage)
.await?;
pausable_failpoint!("sk-snapshot-after-list-pausable");
let tli_dir = tli.get_timeline_dir();
@@ -182,6 +211,7 @@ impl Timeline {
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
storage: &GenericRemoteStorage,
) -> Result<()> {
// Take initial copy of control file, then release state lock
let mut control_file = {
@@ -216,6 +246,7 @@ impl Timeline {
// can fail if the timeline was un-evicted and modified in the background.
let remote_timeline_path = &self.remote_path;
wal_backup::copy_partial_segment(
storage,
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)
@@ -262,6 +293,7 @@ impl WalResidentTimeline {
ar: &mut tokio_tar::Builder<W>,
source: NodeId,
destination: NodeId,
storage: Option<Arc<GenericRemoteStorage>>,
) -> Result<SnapshotContext> {
let mut shared_state = self.write_shared_state().await;
let wal_seg_size = shared_state.get_wal_seg_size();
@@ -283,6 +315,7 @@ impl WalResidentTimeline {
let remote_timeline_path = &self.tli.remote_path;
wal_backup::copy_partial_segment(
&*storage.context("remote storage not configured")?,
&replace.previous.remote_path(remote_timeline_path),
&replace.current.remote_path(remote_timeline_path),
)

View File

@@ -18,7 +18,7 @@ use crate::send_wal::EndWatch;
use crate::state::{TimelinePersistentState, TimelineState};
use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::remote_timeline_path;
use crate::wal_backup::{WalBackup, remote_timeline_path};
use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
/// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
@@ -101,18 +101,22 @@ impl Env {
let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
let wal_backup = Arc::new(WalBackup::new(&conf).await?);
let timeline = Timeline::new(
ttid,
&timeline_dir,
&remote_path,
shared_state,
conf.clone(),
wal_backup.clone(),
);
timeline.bootstrap(
&mut timeline.write_shared_state().await,
&conf,
Arc::new(TimelinesSet::default()), // ignored for now
RateLimiter::new(0, 0),
wal_backup,
);
Ok(timeline)
}

View File

@@ -35,7 +35,8 @@ use crate::state::{EvictionState, TimelineMemState, TimelinePersistentState, Tim
use crate::timeline_guard::ResidenceGuard;
use crate::timeline_manager::{AtomicStatus, ManagerCtl};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::{self, remote_timeline_path};
use crate::wal_backup;
use crate::wal_backup::{WalBackup, remote_timeline_path};
use crate::wal_backup_partial::PartialRemoteSegment;
use crate::wal_storage::{Storage as wal_storage_iface, WalReader};
use crate::{SafeKeeperConf, control_file, debug_dump, timeline_manager, wal_storage};
@@ -452,6 +453,8 @@ pub struct Timeline {
manager_ctl: ManagerCtl,
conf: Arc<SafeKeeperConf>,
pub(crate) wal_backup: Arc<WalBackup>,
remote_deletion: std::sync::Mutex<Option<RemoteDeletionReceiver>>,
/// Hold this gate from code that depends on the Timeline's non-shut-down state. While holding
@@ -476,6 +479,7 @@ impl Timeline {
remote_path: &RemotePath,
shared_state: SharedState,
conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Arc<Self> {
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state().commit_lsn);
@@ -509,6 +513,7 @@ impl Timeline {
wal_backup_active: AtomicBool::new(false),
last_removed_segno: AtomicU64::new(0),
mgr_status: AtomicStatus::new(),
wal_backup,
})
}
@@ -516,6 +521,7 @@ impl Timeline {
pub fn load_timeline(
conf: Arc<SafeKeeperConf>,
ttid: TenantTimelineId,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
@@ -529,6 +535,7 @@ impl Timeline {
&remote_path,
shared_state,
conf,
wal_backup,
))
}
@@ -539,6 +546,7 @@ impl Timeline {
conf: &SafeKeeperConf,
broker_active_set: Arc<TimelinesSet>,
partial_backup_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
) {
let (tx, rx) = self.manager_ctl.bootstrap_manager();
@@ -561,6 +569,7 @@ impl Timeline {
tx,
rx,
partial_backup_rate_limiter,
wal_backup,
)
.await
}
@@ -606,9 +615,10 @@ impl Timeline {
// it is cancelled, so WAL storage won't be opened again.
shared_state.sk.close_wal_store();
if !only_local && self.conf.is_wal_backup_enabled() {
if !only_local {
self.remote_delete().await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
Ok(dir_existed)
}
@@ -675,11 +685,20 @@ impl Timeline {
guard: &mut std::sync::MutexGuard<Option<RemoteDeletionReceiver>>,
) -> RemoteDeletionReceiver {
tracing::info!("starting remote deletion");
let storage = self.wal_backup.get_storage().clone();
let (result_tx, result_rx) = tokio::sync::watch::channel(None);
let ttid = self.ttid;
tokio::task::spawn(
async move {
let r = wal_backup::delete_timeline(&ttid).await;
let r = if let Some(storage) = storage {
wal_backup::delete_timeline(&storage, &ttid).await
} else {
tracing::info!(
"skipping remote deletion because no remote storage is configured; this effectively leaks the objects in remote storage"
);
Ok(())
};
if let Err(e) = &r {
// Log error here in case nobody ever listens for our result (e.g. dropped API request)
tracing::error!("remote deletion failed: {e}");
@@ -1046,14 +1065,13 @@ impl WalResidentTimeline {
pub async fn get_walreader(&self, start_lsn: Lsn) -> Result<WalReader> {
let (_, persisted_state) = self.get_state().await;
let enable_remote_read = self.conf.is_wal_backup_enabled();
WalReader::new(
&self.ttid,
self.timeline_dir.clone(),
&persisted_state,
start_lsn,
enable_remote_read,
self.wal_backup.clone(),
)
}

View File

@@ -6,7 +6,7 @@
use anyhow::Context;
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tracing::{debug, info, instrument, warn};
@@ -68,6 +68,10 @@ impl Manager {
#[instrument(name = "evict_timeline", skip_all)]
pub(crate) async fn evict_timeline(&mut self) -> bool {
assert!(!self.is_offloaded);
let Some(storage) = self.wal_backup.get_storage() else {
warn!("no remote storage configured, skipping uneviction");
return false;
};
let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(),
None => {
@@ -87,7 +91,7 @@ impl Manager {
.inc();
});
if let Err(e) = do_eviction(self, &partial_backup_uploaded).await {
if let Err(e) = do_eviction(self, &partial_backup_uploaded, &storage).await {
warn!("failed to evict timeline: {:?}", e);
return false;
}
@@ -102,6 +106,10 @@ impl Manager {
#[instrument(name = "unevict_timeline", skip_all)]
pub(crate) async fn unevict_timeline(&mut self) {
assert!(self.is_offloaded);
let Some(storage) = self.wal_backup.get_storage() else {
warn!("no remote storage configured, skipping uneviction");
return;
};
let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(),
None => {
@@ -121,7 +129,7 @@ impl Manager {
.inc();
});
if let Err(e) = do_uneviction(self, &partial_backup_uploaded).await {
if let Err(e) = do_uneviction(self, &partial_backup_uploaded, &storage).await {
warn!("failed to unevict timeline: {:?}", e);
return;
}
@@ -137,8 +145,12 @@ impl Manager {
/// Ensure that content matches the remote partial backup, if local segment exists.
/// Then change state in control file and in-memory. If `delete_offloaded_wal` is set,
/// delete the local segment.
async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
compare_local_segment_with_remote(mgr, partial).await?;
async fn do_eviction(
mgr: &mut Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
compare_local_segment_with_remote(mgr, partial, storage).await?;
mgr.tli.switch_to_offloaded(partial).await?;
// switch manager state as soon as possible
@@ -153,12 +165,16 @@ async fn do_eviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyho
/// Ensure that content matches the remote partial backup, if local segment exists.
/// Then download segment to local disk and change state in control file and in-memory.
async fn do_uneviction(mgr: &mut Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
async fn do_uneviction(
mgr: &mut Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
// if the local segment is present, validate it
compare_local_segment_with_remote(mgr, partial).await?;
compare_local_segment_with_remote(mgr, partial, storage).await?;
// atomically download the partial segment
redownload_partial_segment(mgr, partial).await?;
redownload_partial_segment(mgr, partial, storage).await?;
mgr.tli.switch_to_present().await?;
// switch manager state as soon as possible
@@ -181,6 +197,7 @@ async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) ->
async fn redownload_partial_segment(
mgr: &Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
let remote_segfile = remote_segment_path(mgr, partial);
@@ -190,7 +207,7 @@ async fn redownload_partial_segment(
remote_segfile, tmp_file
);
let mut reader = wal_backup::read_object(&remote_segfile, 0).await?;
let mut reader = wal_backup::read_object(storage, &remote_segfile, 0).await?;
let mut file = File::create(&tmp_file).await?;
let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
@@ -234,13 +251,16 @@ async fn redownload_partial_segment(
async fn compare_local_segment_with_remote(
mgr: &Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let local_path = local_segment_path(mgr, partial);
match File::open(&local_path).await {
Ok(mut local_file) => do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial)
.await
.context("validation failed"),
Ok(mut local_file) => {
do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial, storage)
.await
.context("validation failed")
}
Err(_) => {
info!(
"local WAL file {} is not present, skipping validation",
@@ -258,6 +278,7 @@ async fn do_validation(
file: &mut File,
wal_seg_size: usize,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size {
@@ -270,7 +291,7 @@ async fn do_validation(
let remote_segfile = remote_segment_path(mgr, partial);
let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
wal_backup::read_object(&remote_segfile, 0).await?;
wal_backup::read_object(storage, &remote_segfile, 0).await?;
// remote segment should have bytes excatly up to `flush_lsn`
let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);

View File

@@ -35,7 +35,7 @@ use crate::state::TimelineState;
use crate::timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline};
use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
use crate::wal_backup::{self, WalBackupTaskHandle};
use crate::wal_backup::{self, WalBackup, WalBackupTaskHandle};
use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
pub(crate) struct StateSnapshot {
@@ -200,6 +200,7 @@ pub(crate) struct Manager {
pub(crate) conf: SafeKeeperConf,
pub(crate) wal_seg_size: usize,
pub(crate) walsenders: Arc<WalSenders>,
pub(crate) wal_backup: Arc<WalBackup>,
// current state
pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
@@ -238,6 +239,7 @@ pub async fn main_task(
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
) {
tli.set_status(Status::Started);
@@ -256,6 +258,7 @@ pub async fn main_task(
broker_active_set,
manager_tx,
global_rate_limiter,
wal_backup,
)
.await;
@@ -371,7 +374,7 @@ pub async fn main_task(
mgr.tli_broker_active.set(false);
// shutdown background tasks
if mgr.conf.is_wal_backup_enabled() {
if let Some(storage) = mgr.wal_backup.get_storage() {
if let Some(backup_task) = mgr.backup_task.take() {
// If we fell through here, then the timeline is shutting down. This is important
// because otherwise joining on the wal_backup handle might hang.
@@ -379,7 +382,7 @@ pub async fn main_task(
backup_task.join().await;
}
wal_backup::update_task(&mut mgr, false, &last_state).await;
wal_backup::update_task(&mut mgr, storage, false, &last_state).await;
}
if let Some(recovery_task) = &mut mgr.recovery_task {
@@ -415,11 +418,13 @@ impl Manager {
broker_active_set: Arc<TimelinesSet>,
manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
) -> Manager {
let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
Manager {
wal_seg_size: tli.get_wal_seg_size().await,
walsenders: tli.get_walsenders().clone(),
wal_backup,
state_version_rx: tli.get_state_version_rx(),
num_computes_rx: tli.get_walreceivers().get_num_rx(),
tli_broker_active: broker_active_set.guard(tli.clone()),
@@ -477,8 +482,8 @@ impl Manager {
let is_wal_backup_required =
wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
if self.conf.is_wal_backup_enabled() {
wal_backup::update_task(self, is_wal_backup_required, state).await;
if let Some(storage) = self.wal_backup.get_storage() {
wal_backup::update_task(self, storage, is_wal_backup_required, state).await;
}
// update the state in Arc<Timeline>
@@ -624,9 +629,9 @@ impl Manager {
/// Spawns partial WAL backup task if needed.
async fn update_partial_backup(&mut self, state: &StateSnapshot) {
// check if WAL backup is enabled and should be started
if !self.conf.is_wal_backup_enabled() {
let Some(storage) = self.wal_backup.get_storage() else {
return;
}
};
if self.partial_backup_task.is_some() {
// partial backup is already running
@@ -650,6 +655,7 @@ impl Manager {
self.conf.clone(),
self.global_rate_limiter.clone(),
cancel.clone(),
storage,
));
self.partial_backup_task = Some((handle, cancel));
}
@@ -669,6 +675,10 @@ impl Manager {
/// Reset partial backup state and remove its remote storage data. Since it
/// might concurrently uploading something, cancel the task first.
async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
let Some(storage) = self.wal_backup.get_storage() else {
anyhow::bail!("remote storage is not enabled");
};
info!("resetting partial backup state");
// Force unevict timeline if it is evicted before erasing partial backup
// state. The intended use of this function is to drop corrupted remote
@@ -689,7 +699,7 @@ impl Manager {
}
let tli = self.wal_resident_timeline()?;
let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await;
let mut partial_backup = PartialBackup::new(tli, self.conf.clone(), storage).await;
// Reset might fail e.g. when cfile is already reset but s3 removal
// failed, so set manager state to None beforehand. In any case caller
// is expected to retry until success.

View File

@@ -25,6 +25,7 @@ use crate::rate_limit::RateLimiter;
use crate::state::TimelinePersistentState;
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
use crate::timelines_set::TimelinesSet;
use crate::wal_backup::WalBackup;
use crate::wal_storage::Storage;
use crate::{SafeKeeperConf, control_file, wal_storage};
@@ -47,15 +48,24 @@ struct GlobalTimelinesState {
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
global_rate_limiter: RateLimiter,
wal_backup: Arc<WalBackup>,
}
impl GlobalTimelinesState {
/// Get dependencies for a timeline constructor.
fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
fn get_dependencies(
&self,
) -> (
Arc<SafeKeeperConf>,
Arc<TimelinesSet>,
RateLimiter,
Arc<WalBackup>,
) {
(
self.conf.clone(),
self.broker_active_set.clone(),
self.global_rate_limiter.clone(),
self.wal_backup.clone(),
)
}
@@ -84,7 +94,7 @@ pub struct GlobalTimelines {
impl GlobalTimelines {
/// Create a new instance of the global timelines map.
pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
Self {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
@@ -92,6 +102,7 @@ impl GlobalTimelines {
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
wal_backup,
}),
}
}
@@ -147,7 +158,7 @@ impl GlobalTimelines {
/// just lock and unlock it for each timeline -- this function is called
/// during init when nothing else is running, so this is fine.
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let state = self.state.lock().unwrap();
state.get_dependencies()
};
@@ -162,7 +173,7 @@ impl GlobalTimelines {
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
{
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
match Timeline::load_timeline(conf.clone(), ttid) {
match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
Ok(tli) => {
let mut shared_state = tli.write_shared_state().await;
self.state
@@ -175,6 +186,7 @@ impl GlobalTimelines {
&conf,
broker_active_set.clone(),
partial_backup_rate_limiter.clone(),
wal_backup.clone(),
);
}
// If we can't load a timeline, it's most likely because of a corrupted
@@ -212,6 +224,10 @@ impl GlobalTimelines {
self.state.lock().unwrap().broker_active_set.clone()
}
pub fn get_wal_backup(&self) -> Arc<WalBackup> {
self.state.lock().unwrap().wal_backup.clone()
}
/// Create a new timeline with the given id. If the timeline already exists, returns
/// an existing timeline.
pub(crate) async fn create(
@@ -222,7 +238,7 @@ impl GlobalTimelines {
start_lsn: Lsn,
commit_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let (conf, _, _, _) = {
let state = self.state.lock().unwrap();
if let Ok(timeline) = state.get(&ttid) {
// Timeline already exists, return it.
@@ -267,7 +283,7 @@ impl GlobalTimelines {
check_tombstone: bool,
) -> Result<Arc<Timeline>> {
// Check for existence and mark that we're creating it.
let (conf, broker_active_set, partial_backup_rate_limiter) = {
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
let mut state = self.state.lock().unwrap();
match state.timelines.get(&ttid) {
Some(GlobalMapTimeline::CreationInProgress) => {
@@ -296,7 +312,14 @@ impl GlobalTimelines {
};
// Do the actual move and reflect the result in the map.
match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
match GlobalTimelines::install_temp_timeline(
ttid,
tmp_path,
conf.clone(),
wal_backup.clone(),
)
.await
{
Ok(timeline) => {
let mut timeline_shared_state = timeline.write_shared_state().await;
let mut state = self.state.lock().unwrap();
@@ -314,6 +337,7 @@ impl GlobalTimelines {
&conf,
broker_active_set,
partial_backup_rate_limiter,
wal_backup,
);
drop(timeline_shared_state);
Ok(timeline)
@@ -336,6 +360,7 @@ impl GlobalTimelines {
ttid: TenantTimelineId,
tmp_path: &Utf8PathBuf,
conf: Arc<SafeKeeperConf>,
wal_backup: Arc<WalBackup>,
) -> Result<Arc<Timeline>> {
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
@@ -377,7 +402,7 @@ impl GlobalTimelines {
// Do the move.
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
Timeline::load_timeline(conf, ttid)
Timeline::load_timeline(conf, ttid, wal_backup)
}
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,

View File

@@ -2,6 +2,7 @@ use std::cmp::min;
use std::collections::HashSet;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
@@ -17,7 +18,7 @@ use safekeeper_api::models::PeerInfo;
use tokio::fs::File;
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{OnceCell, watch};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -63,7 +64,12 @@ pub(crate) fn is_wal_backup_required(
/// Based on peer information determine which safekeeper should offload; if it
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
/// is running, kill it.
pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &StateSnapshot) {
pub(crate) async fn update_task(
mgr: &mut Manager,
storage: Arc<GenericRemoteStorage>,
need_backup: bool,
state: &StateSnapshot,
) {
let (offloader, election_dbg_str) =
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
let elected_me = Some(mgr.conf.my_id) == offloader;
@@ -82,7 +88,12 @@ pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &St
return;
};
let async_task = backup_task_main(resident, mgr.conf.backup_parallel_jobs, shutdown_rx);
let async_task = backup_task_main(
resident,
storage,
mgr.conf.backup_parallel_jobs,
shutdown_rx,
);
let handle = if mgr.conf.current_thread_runtime {
tokio::spawn(async_task)
@@ -169,33 +180,31 @@ fn determine_offloader(
}
}
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::const_new();
// Storage must be configured and initialized when this is called.
fn get_configured_remote_storage() -> &'static GenericRemoteStorage {
REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
.as_ref()
.unwrap()
pub struct WalBackup {
storage: Option<Arc<GenericRemoteStorage>>,
}
pub async fn init_remote_storage(conf: &SafeKeeperConf) {
// TODO: refactor REMOTE_STORAGE to avoid using global variables, and provide
// dependencies to all tasks instead.
REMOTE_STORAGE
.get_or_init(|| async {
if let Some(conf) = conf.remote_storage.as_ref() {
Some(
GenericRemoteStorage::from_config(conf)
.await
.expect("failed to create remote storage"),
)
} else {
None
impl WalBackup {
/// Create a new WalBackup instance.
pub async fn new(conf: &SafeKeeperConf) -> Result<Self> {
if !conf.wal_backup_enabled {
return Ok(Self { storage: None });
}
match conf.remote_storage.as_ref() {
Some(config) => {
let storage = GenericRemoteStorage::from_config(config).await?;
Ok(Self {
storage: Some(Arc::new(storage)),
})
}
})
.await;
None => Ok(Self { storage: None }),
}
}
pub fn get_storage(&self) -> Option<Arc<GenericRemoteStorage>> {
self.storage.clone()
}
}
struct WalBackupTask {
@@ -204,12 +213,14 @@ struct WalBackupTask {
wal_seg_size: usize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
storage: Arc<GenericRemoteStorage>,
}
/// Offload single timeline.
#[instrument(name = "wal_backup", skip_all, fields(ttid = %tli.ttid))]
async fn backup_task_main(
tli: WalResidentTimeline,
storage: Arc<GenericRemoteStorage>,
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
@@ -223,6 +234,7 @@ async fn backup_task_main(
timeline_dir: tli.get_timeline_dir(),
timeline: tli,
parallel_jobs,
storage,
};
// task is spinned up only when wal_seg_size already initialized
@@ -293,6 +305,7 @@ impl WalBackupTask {
match backup_lsn_range(
&self.timeline,
self.storage.clone(),
&mut backup_lsn,
commit_lsn,
self.wal_seg_size,
@@ -322,6 +335,7 @@ impl WalBackupTask {
async fn backup_lsn_range(
timeline: &WalResidentTimeline,
storage: Arc<GenericRemoteStorage>,
backup_lsn: &mut Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
@@ -352,7 +366,12 @@ async fn backup_lsn_range(
loop {
let added_task = match iter.next() {
Some(s) => {
uploads.push_back(backup_single_segment(s, timeline_dir, remote_timeline_path));
uploads.push_back(backup_single_segment(
&storage,
s,
timeline_dir,
remote_timeline_path,
));
true
}
None => false,
@@ -388,6 +407,7 @@ async fn backup_lsn_range(
}
async fn backup_single_segment(
storage: &GenericRemoteStorage,
seg: &Segment,
timeline_dir: &Utf8Path,
remote_timeline_path: &RemotePath,
@@ -395,7 +415,13 @@ async fn backup_single_segment(
let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = seg.remote_path(remote_timeline_path);
let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
let res = backup_object(
storage,
&segment_file_path,
&remote_segment_path,
seg.size(),
)
.await;
if res.is_ok() {
BACKED_UP_SEGMENTS.inc();
} else {
@@ -455,12 +481,11 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
}
async fn backup_object(
storage: &GenericRemoteStorage,
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
let storage = get_configured_remote_storage();
let file = File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
@@ -475,12 +500,11 @@ async fn backup_object(
}
pub(crate) async fn backup_partial_segment(
storage: &GenericRemoteStorage,
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
let storage = get_configured_remote_storage();
let file = File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {source_file:?} for wal backup"))?;
@@ -504,25 +528,20 @@ pub(crate) async fn backup_partial_segment(
}
pub(crate) async fn copy_partial_segment(
storage: &GenericRemoteStorage,
source: &RemotePath,
destination: &RemotePath,
) -> Result<()> {
let storage = get_configured_remote_storage();
let cancel = CancellationToken::new();
storage.copy_object(source, destination, &cancel).await
}
pub async fn read_object(
storage: &GenericRemoteStorage,
file_path: &RemotePath,
offset: u64,
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead + Send + Sync>>> {
let storage = REMOTE_STORAGE
.get()
.context("Failed to get remote storage")?
.as_ref()
.context("No remote storage configured")?;
info!("segment download about to start from remote path {file_path:?} at offset {offset}");
let cancel = CancellationToken::new();
@@ -547,8 +566,10 @@ pub async fn read_object(
/// Delete WAL files for the given timeline. Remote storage must be configured
/// when called.
pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
let storage = get_configured_remote_storage();
pub async fn delete_timeline(
storage: &GenericRemoteStorage,
ttid: &TenantTimelineId,
) -> Result<()> {
let remote_path = remote_timeline_path(ttid)?;
// see DEFAULT_MAX_KEYS_PER_LIST_RESPONSE
@@ -618,14 +639,14 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
}
/// Used by wal_backup_partial.
pub async fn delete_objects(paths: &[RemotePath]) -> Result<()> {
pub async fn delete_objects(storage: &GenericRemoteStorage, paths: &[RemotePath]) -> Result<()> {
let cancel = CancellationToken::new(); // not really used
let storage = get_configured_remote_storage();
storage.delete_objects(paths, &cancel).await
}
/// Copy segments from one timeline to another. Used in copy_timeline.
pub async fn copy_s3_segments(
storage: &GenericRemoteStorage,
wal_seg_size: usize,
src_ttid: &TenantTimelineId,
dst_ttid: &TenantTimelineId,
@@ -634,12 +655,6 @@ pub async fn copy_s3_segments(
) -> Result<()> {
const SEGMENTS_PROGRESS_REPORT_INTERVAL: u64 = 1024;
let storage = REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
.as_ref()
.unwrap();
let remote_dst_path = remote_timeline_path(dst_ttid)?;
let cancel = CancellationToken::new();

View File

@@ -19,9 +19,11 @@
//! file. Code updates state in the control file before doing any S3 operations.
//! This way control file stores information about all potentially existing
//! remote partial segments and can clean them up after uploading a newer version.
use std::sync::Arc;
use camino::Utf8PathBuf;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo};
use remote_storage::RemotePath;
use remote_storage::{GenericRemoteStorage, RemotePath};
use safekeeper_api::Term;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
@@ -154,12 +156,16 @@ pub struct PartialBackup {
conf: SafeKeeperConf,
local_prefix: Utf8PathBuf,
remote_timeline_path: RemotePath,
storage: Arc<GenericRemoteStorage>,
state: State,
}
impl PartialBackup {
pub async fn new(tli: WalResidentTimeline, conf: SafeKeeperConf) -> PartialBackup {
pub async fn new(
tli: WalResidentTimeline,
conf: SafeKeeperConf,
storage: Arc<GenericRemoteStorage>,
) -> PartialBackup {
let (_, persistent_state) = tli.get_state().await;
let wal_seg_size = tli.get_wal_seg_size().await;
@@ -173,6 +179,7 @@ impl PartialBackup {
conf,
local_prefix,
remote_timeline_path,
storage,
}
}
@@ -240,7 +247,8 @@ impl PartialBackup {
let remote_path = prepared.remote_path(&self.remote_timeline_path);
// Upload first `backup_bytes` bytes of the segment to the remote storage.
wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
wal_backup::backup_partial_segment(&self.storage, &local_path, &remote_path, backup_bytes)
.await?;
PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
// We uploaded the segment, now let's verify that the data is still actual.
@@ -326,7 +334,7 @@ impl PartialBackup {
let remote_path = self.remote_timeline_path.join(seg);
objects_to_delete.push(remote_path);
}
wal_backup::delete_objects(&objects_to_delete).await
wal_backup::delete_objects(&self.storage, &objects_to_delete).await
}
/// Delete all non-Uploaded segments from the remote storage. There should be only one
@@ -424,6 +432,7 @@ pub async fn main_task(
conf: SafeKeeperConf,
limiter: RateLimiter,
cancel: CancellationToken,
storage: Arc<GenericRemoteStorage>,
) -> Option<PartialRemoteSegment> {
debug!("started");
let await_duration = conf.partial_backup_timeout;
@@ -432,7 +441,7 @@ pub async fn main_task(
let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
let mut backup = PartialBackup::new(tli, conf).await;
let mut backup = PartialBackup::new(tli, conf, storage).await;
debug!("state: {:?}", backup.state);

View File

@@ -21,6 +21,7 @@ use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion};
use pq_proto::SystemId;
use remote_storage::RemotePath;
use std::sync::Arc;
use tokio::fs::{self, File, OpenOptions, remove_file};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tracing::*;
@@ -32,7 +33,7 @@ use crate::metrics::{
REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics, time_io_closure,
};
use crate::state::TimelinePersistentState;
use crate::wal_backup::{read_object, remote_timeline_path};
use crate::wal_backup::{WalBackup, read_object, remote_timeline_path};
pub trait Storage {
// Last written LSN.
@@ -645,7 +646,7 @@ pub struct WalReader {
wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
// S3 will be used to read WAL if LSN is not available locally
enable_remote_read: bool,
wal_backup: Arc<WalBackup>,
// We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn,
@@ -664,7 +665,7 @@ impl WalReader {
timeline_dir: Utf8PathBuf,
state: &TimelinePersistentState,
start_pos: Lsn,
enable_remote_read: bool,
wal_backup: Arc<WalBackup>,
) -> Result<Self> {
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
@@ -693,7 +694,7 @@ impl WalReader {
wal_seg_size: state.server.wal_seg_size as usize,
pos: start_pos,
wal_segment: None,
enable_remote_read,
wal_backup,
local_start_lsn: state.local_start_lsn,
timeline_start_lsn: state.timeline_start_lsn,
pg_version: state.server.pg_version / 10000,
@@ -812,9 +813,9 @@ impl WalReader {
}
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
if let Some(storage) = self.wal_backup.get_storage() {
let remote_wal_file_path = self.remote_path.join(&wal_file_name);
return read_object(&remote_wal_file_path, xlogoff as u64).await;
return read_object(&storage, &remote_wal_file_path, xlogoff as u64).await;
}
bail!("WAL segment is not found")

View File

@@ -628,11 +628,7 @@ impl Scheduler {
tracing::trace!(%node_id, "attached_shard_count={} shard_count={} expected={}", node.attached_shard_count, node.shard_count, expected_attached_shards_per_node);
}
if node.attached_shard_count < expected_attached_shards_per_node {
expected_attached_shards_per_node - node.attached_shard_count
} else {
0
}
expected_attached_shards_per_node.saturating_sub(node.attached_shard_count)
}
pub(crate) fn expected_attached_shard_count(&self) -> usize {

View File

@@ -60,7 +60,8 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }
nix = { version = "0.26" }
nix-2f80eeee3b1b6c7e = { package = "nix", version = "0.26" }
nix-fa1f6196edfd7249 = { package = "nix", version = "0.30", features = ["dir", "ioctl", "mman", "poll", "signal", "socket"] }
nom = { version = "7" }
num = { version = "0.4" }
num-bigint = { version = "0.4" }