This commit is contained in:
Victor Polevoy
2025-07-10 15:34:48 +02:00
parent 22d708ea0e
commit 245db281f3
11 changed files with 120 additions and 9 deletions

View File

@@ -14,6 +14,7 @@
//! in-place and are at a high level achieved by expanding/reducing the bucket array and rebuilding the
//! dictionary by rehashing all keys.
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash};
use std::mem::MaybeUninit;
@@ -41,6 +42,22 @@ pub struct HashMapInit<'a, K, V, S = rustc_hash::FxBuildHasher> {
num_buckets: u32,
}
impl<'a, K, V, S> Debug for HashMapInit<'a, K, V, S>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashMapInit")
.field("shmem_handle", &self.shmem_handle)
.field("shared_ptr", &self.shared_ptr)
.field("shared_size", &self.shared_size)
// .field("hasher", &self.hasher)
.field("num_buckets", &self.num_buckets)
.finish()
}
}
/// This is a per-process handle to a hash table that (possibly) lives in shared memory.
/// If a child process is launched with fork(), the child process should
/// get its own HashMapAccess by calling HashMapInit::attach_writer/reader().
@@ -56,6 +73,20 @@ pub struct HashMapAccess<'a, K, V, S = rustc_hash::FxBuildHasher> {
unsafe impl<K: Sync, V: Sync, S> Sync for HashMapAccess<'_, K, V, S> {}
unsafe impl<K: Send, V: Send, S> Send for HashMapAccess<'_, K, V, S> {}
impl<'a, K, V, S> Debug for HashMapAccess<'a, K, V, S>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashMapAccess")
.field("shmem_handle", &self.shmem_handle)
.field("shared_ptr", &self.shared_ptr)
// .field("hasher", &self.hasher)
.finish()
}
}
impl<'a, K: Clone + Hash + Eq, V, S> HashMapInit<'a, K, V, S> {
/// Change the 'hasher' used by the hash table.
///

View File

@@ -1,5 +1,6 @@
//! Simple hash table with chaining.
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::MaybeUninit;
@@ -17,6 +18,19 @@ pub(crate) struct Bucket<K, V> {
pub(crate) inner: Option<(K, V)>,
}
impl<K, V> Debug for Bucket<K, V>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bucket")
.field("next", &self.next)
.field("inner", &self.inner)
.finish()
}
}
/// Core hash table implementation.
pub(crate) struct CoreHashMap<'a, K, V> {
/// Dictionary used to map hashes to bucket indices.
@@ -34,6 +48,22 @@ pub(crate) struct CoreHashMap<'a, K, V> {
pub(crate) _user_list_head: u32,
}
impl<'a, K, V> Debug for CoreHashMap<'a, K, V>
where
K: Debug,
V: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoreHashMap")
.field("dictionary", &self.dictionary)
.field("buckets", &self.buckets)
.field("free_head", &self.free_head)
.field("alloc_limit", &self.alloc_limit)
.field("buckets_in_use", &self.buckets_in_use)
.finish()
}
}
/// Error for when there are no empty buckets left but one is needed.
#[derive(Debug, PartialEq)]
pub struct FullError();

View File

@@ -21,6 +21,7 @@ use nix::unistd::ftruncate as nix_ftruncate;
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use `mprotect()` etc. to enforce that in the
/// future.
#[derive(Debug)]
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
@@ -35,6 +36,7 @@ pub struct ShmemHandle {
}
/// This is stored at the beginning in the shared memory area.
#[derive(Debug)]
struct SharedStruct {
max_size: usize,

View File

@@ -22,6 +22,7 @@ pub type CacheBlock = u64;
pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX;
#[derive(Debug)]
pub struct FileCache {
file: Arc<File>,
@@ -35,6 +36,7 @@ pub struct FileCache {
// TODO: We keep track of all free blocks in this vec. That doesn't really scale.
// Idea: when free_blocks fills up with more than 1024 entries, write them all to
// one block on disk.
#[derive(Debug)]
struct FreeList {
next_free_block: CacheBlock,
max_blocks: u64,

View File

@@ -46,6 +46,7 @@ pub struct IntegratedCacheInitStruct<'t> {
}
/// Represents write-access to the integrated cache. This is used by the communicator process.
#[derive(Debug)]
pub struct IntegratedCacheWriteAccess<'t> {
relsize_cache: neon_shmem::hash::HashMapAccess<'t, RelKey, RelEntry>,
block_map: neon_shmem::hash::HashMapAccess<'t, BlockKey, BlockEntry>,

View File

@@ -21,5 +21,31 @@ mod worker_process;
mod global_allocator;
// FIXME get this from postgres headers somehow
// FIXME: get this from postgres headers somehow
/// Size of a disk block: this also limits the size of a tuple. You can
/// set it bigger if you need bigger tuples (although `TOAST` should
/// reduce the need to have large tuples, since fields can be spread
/// across multiple tuples). [`BLCKSZ`] must be a power of `2`. The
/// maximum possible value of [`BLCKSZ`] is currently `2^15` (`32768`).
/// This is determined by the `15-bit` widths of the `lp_off` and
/// `lp_len` fields in ItemIdData (see `include/storage/itemid.h`).
/// Changing [`BLCKSZ`] requires an `initdb`.
pub const BLCKSZ: usize = 8192;
/// Define SLRU segment size. A page is the same [`BLCKSZ``] and is used
/// everywhere else in Postgres. The segment size can be chosen somewhat
/// arbitrarily; we make it `32` pages by default, or `256Kb`, i.e. 1
/// **million** transactions for `CLOG` or `64K` transactions for
/// `SUBTRANS`.
///
/// # Note
///
/// Because TransactionIds are 32 bits and wrap around at `0xFFFFFFFF``,
/// page numbering also wraps around at `0xFFFFFFFF/xxxx_XACTS_PER_PAGE`
/// (where `xxxx` is `CLOG` or `SUBTRANS`, respectively), and segment
/// numbering at
/// `0xFFFFFFFF/xxxx_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT`. We need
/// take no explicit notice of that fact in `slru.c`, except when
/// comparing segment and page numbers in SimpleLruTruncate
/// (see PagePrecedes()).
pub const SLRU_PAGES_PER_SEGMENT: usize = 32;

View File

@@ -44,8 +44,8 @@ pub enum NeonIOResult {
/// the result pages are written to the shared memory addresses given in the request
GetPageV,
/// The result is written to the shared memory address given in the
/// request.
ReadSlruSegment,
/// request. The [`u64`] value here is the number of blocks.
ReadSlruSegment(u64),
/// A prefetch request returns as soon as the request has been received by the communicator.
/// It is processed in the background.

View File

@@ -107,6 +107,9 @@ pub(super) async fn init(
.integrated_cache_init_struct
.worker_process_init(last_lsn, file_cache);
info!("Initialised integrated cache: {cache:?}");
// TODO: plumb through the stripe size.
let tenant_id = TenantId::from_str(&tenant_id).expect("invalid tenant ID");
let timeline_id = TimelineId::from_str(&timeline_id).expect("invalid timeline ID");
let shard_spec = ShardSpec::new(shard_map, stripe_size).expect("invalid shard spec");
@@ -444,7 +447,9 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len);
};
NeonIOResult::ReadSlruSegment
let blocks_count = len / (crate::BLCKSZ * crate::SLRU_PAGES_PER_SEGMENT);
NeonIOResult::ReadSlruSegment(blocks_count as _)
}
Err(err) => {
info!("tonic error: {err:?}");

View File

@@ -997,7 +997,12 @@ communicator_new_dbsize(Oid dbNode)
}
int
communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer)
// communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer)
communicator_new_read_slru_segment(
SlruKind kind,
uint32_t segno,
neon_request_lsns *request_lsns,
void *buffer)
{
NeonIOResult result = {};
NeonIORequest request = {
@@ -1012,11 +1017,14 @@ communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer)
elog(DEBUG5, "readslrusegment called for kind=%u, segno=%u", kind, segno);
/* FIXME: see `request_lsns` in main_loop.rs for why this is needed */
XLogSetAsyncXactLSN(request_lsns->request_lsn);
perform_request(&request, &result);
switch (result.tag)
{
case NeonIOResult_ReadSlruSegment:
return 0;
return result.read_slru_segment;
case NeonIOResult_Error:
ereport(ERROR,
(errcode_for_file_access(),

View File

@@ -38,8 +38,14 @@ extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkN
BlockNumber nblocks);
extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blockno);
extern int communicator_new_read_slru_segment(SlruKind kind, uint32_t segno,
void *buffer);
// extern int communicator_new_read_slru_segment(SlruKind kind, uint32_t segno,
// void *buffer);
extern int communicator_new_read_slru_segment(
SlruKind kind,
uint32_t segno,
neon_request_lsns *request_lsns,
void *buffer
);
/* Write requests, to keep the caches up-to-date */
extern void communicator_new_write_page(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno,

View File

@@ -2385,7 +2385,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
request_lsns.effective_request_lsn = request_lsn;
if (neon_enable_new_communicator)
n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, buffer);
n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, &request_lsns, buffer);
else
n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer);