This commit is contained in:
quantumish
2025-08-11 11:33:45 -07:00
parent 8cfd7ceaf4
commit 6ad484056c
8 changed files with 235 additions and 157 deletions

View File

@@ -41,8 +41,8 @@ pub enum HashMapShrinkError {
#[error("shmem resize failed: {0}")]
ResizeError(shmem::Error),
/// Occupied entries in to-be-shrunk space were encountered beginning at the given index.
#[error("occupied entry in deallocated space found at {0}")]
RemainingEntries(usize),
#[error("occupied entry in deallocated space found at {2} (in deallocated range of {0}..{1})")]
RemainingEntries(usize, usize, usize),
}
/// This represents a hash table that (possibly) lives in shared memory.
@@ -554,10 +554,10 @@ where
/// greater than the number of buckets in the map.
pub fn begin_shrink(&self, num_buckets: u32) {
let mut map = unsafe { self.shared_ptr.as_mut() }.unwrap().write();
assert!(
num_buckets <= map.get_num_buckets() as u32,
"shrink called with a larger number of buckets"
);
// assert!(
// num_buckets <= map.get_num_buckets() as u32,
// "shrink called with a larger number of buckets"
// );
_ = self
.shmem_handle
.as_ref()
@@ -602,7 +602,9 @@ where
for i in (num_buckets as usize)..map.buckets.len() {
if map.buckets[i].inner.is_some() {
return Err(HashMapShrinkError::RemainingEntries(i));
return Err(HashMapShrinkError::RemainingEntries(
num_buckets as usize, map.buckets.len(), i)
);
}
}

View File

@@ -9,10 +9,12 @@
//! process. The backend processes *also* read the file (and sometimes also
//! write it? ), but the backends use direct C library calls for that.
use std::fs::File;
use std::os::linux::fs::MetadataExt;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use measured::metric;
use measured::metric::MetricEncoding;
@@ -30,11 +32,14 @@ pub const INVALID_CACHE_BLOCK: CacheBlock = u64::MAX;
pub struct FileCache {
file: Arc<File>,
free_list: Mutex<FreeList>,
pub size: AtomicU64,
max_read: AtomicU64,
// The `fiemap-rs` library doesn't expose any way to issue a FIEMAP ioctl
// on an existing file descroptor, so we have to save the path.
path: PathBuf,
pub max_written: AtomicU64,
metrics: FileCacheMetricGroup,
}
@@ -55,7 +60,6 @@ struct FileCacheMetricGroup {
struct FreeList {
next_free_block: CacheBlock,
max_blocks: u64,
free_blocks: Vec<CacheBlock>,
}
@@ -79,17 +83,34 @@ impl FileCache {
tracing::info!("initialized file cache with {} blocks", initial_size);
Ok(FileCache {
max_read: 0.into(),
file: Arc::new(file),
size: initial_size.into(),
free_list: Mutex::new(FreeList {
next_free_block: 0,
max_blocks: initial_size,
free_blocks: Vec::new(),
}),
max_written: 0.into(),
path: file_cache_path.to_path_buf(),
metrics: FileCacheMetricGroup::new(),
})
}
/// Debug utility.
pub fn num_allocated_blocks(&self) -> (u64, u32) {
(
std::fs::metadata(self.path.clone()).unwrap().st_size() / (1024 * 1024),
String::from_utf8_lossy(
&std::process::Command::new("ls")
.args(["-sk", self.path.to_str().unwrap()])
.output()
.expect("failed to run ls -sk")
.stdout
).split(' ').next().unwrap().parse().unwrap()
)
}
// File cache management
pub async fn read_block(
@@ -100,6 +121,10 @@ impl FileCache {
assert!(dst.bytes_total() == BLCKSZ);
let file = self.file.clone();
let prev = self.max_read.fetch_max(cache_block, Ordering::Relaxed);
if prev.max(cache_block) > self.free_list.lock().unwrap().max_blocks {
tracing::info!("max read greater than max block!! {cache_block}");
}
let dst_ref = unsafe { std::slice::from_raw_parts_mut(dst.stable_mut_ptr(), BLCKSZ) };
spawn_blocking(move || file.read_exact_at(dst_ref, cache_block * BLCKSZ as u64)).await??;
@@ -114,6 +139,8 @@ impl FileCache {
assert!(src.bytes_init() == BLCKSZ);
let file = self.file.clone();
self.max_written.fetch_max(cache_block, Ordering::Relaxed);
let src_ref = unsafe { std::slice::from_raw_parts(src.stable_ptr(), BLCKSZ) };
spawn_blocking(move || file.write_all_at(src_ref, cache_block * BLCKSZ as u64)).await??;
@@ -141,20 +168,20 @@ impl FileCache {
pub fn reclaim_blocks(&self, num_blocks: u64) -> u64 {
let mut free_list = self.free_list.lock().unwrap();
let mut removed = 0;
while let Some(block) = free_list.free_blocks.pop() {
self.delete_block(block);
removed += 1;
tracing::warn!("next block: {}, max block: {}", free_list.next_free_block, free_list.max_blocks);
let removed = (free_list.free_blocks.len() as u64).min(num_blocks);
for _ in 0..removed {
self.delete_block(free_list.free_blocks.pop().unwrap());
}
tracing::warn!("punched {removed} individual holes");
tracing::warn!("avail block space is: {}", free_list.max_blocks - free_list.next_free_block);
let block_space = (num_blocks - removed).min(free_list.max_blocks - free_list.next_free_block);
let block_space = (num_blocks - removed)
.min(free_list.max_blocks - free_list.next_free_block);
if block_space > 0 {
self.delete_blocks(free_list.max_blocks - block_space, block_space);
free_list.max_blocks -= block_space;
}
self.size.fetch_sub(block_space, Ordering::Relaxed);
tracing::warn!("punched a large hole of size {block_space}");
num_blocks - removed - block_space
num_blocks - block_space - removed
}
/// "Delete" a block via fallocate's hole punching feature.
@@ -167,6 +194,8 @@ impl FileCache {
pub fn delete_blocks(&self, start: CacheBlock, amt: u64) {
use nix::fcntl as nix;
if amt == 0 { return }
self.size.fetch_sub(amt, Ordering::Relaxed);
if let Err(e) = nix::fallocate(
self.file.clone(),
nix::FallocateFlags::FALLOC_FL_PUNCH_HOLE
@@ -179,7 +208,6 @@ impl FileCache {
}
}
/// Attempt to reclaim `num_blocks` of previously hole-punched blocks.
#[cfg(target_os = "linux")]
pub fn undelete_blocks(&self, num_blocks: u64) -> u64 {
@@ -191,7 +219,8 @@ impl FileCache {
if (prev.fe_logical + prev.fe_length) < cur.fe_logical {
let mut end = prev.fe_logical + prev.fe_length;
while end < cur.fe_logical {
free_list.free_blocks.push(end);
self.size.fetch_add(1, Ordering::Relaxed);
free_list.free_blocks.push(end / BLCKSZ as u64);
pushed += 1;
if pushed == num_blocks {
return 0;
@@ -221,6 +250,7 @@ impl FileCache {
if res >= num_bytes {
break;
}
self.size.fetch_add(1, Ordering::Relaxed);
free_list.free_blocks.push(res);
pushed += 1;
if pushed == num_blocks {
@@ -233,7 +263,11 @@ impl FileCache {
/// Physically grows the file and expands the freelist.
pub fn grow(&self, num_blocks: u64) {
self.free_list.lock().unwrap().max_blocks += num_blocks;
let mut free_list = self.free_list.lock().unwrap();
// self.allocate_blocks(free_list.max_blocks, num_blocks);
self.size.fetch_add(num_blocks, Ordering::Relaxed);
free_list.max_blocks += num_blocks;
tracing::warn!("(after) max block: {}", free_list.max_blocks);
}
/// Returns number of blocks in the remaining space.

View File

@@ -51,6 +51,7 @@ pub struct IntegratedCacheInitStruct<'t> {
shared: &'t IntegratedCacheShared,
relsize_cache_handle: HashMapInit<RelKey, RelEntry>,
block_map_handle: HashMapInit<BlockKey, BlockEntry>,
max_file_cache_size: u32,
}
/// This struct is allocated in the (fixed-size) shared memory area at postmaster startup.
@@ -66,6 +67,9 @@ pub struct IntegratedCacheWriteAccess<'t> {
relsize_cache: neon_shmem::hash::HashMapAccess<RelKey, RelEntry>,
block_map: Arc<neon_shmem::hash::HashMapAccess<BlockKey, BlockEntry>>,
resize_mutex: std::sync::Mutex<()>,
max_file_cache_size: u32,
pub(crate) file_cache: Option<FileCache>,
// Fields for eviction
@@ -149,6 +153,7 @@ impl<'t> IntegratedCacheInitStruct<'t> {
shared,
relsize_cache_handle,
block_map_handle,
max_file_cache_size: max_file_cache_size as u32,
}
}
@@ -162,6 +167,7 @@ impl<'t> IntegratedCacheInitStruct<'t> {
shared,
relsize_cache_handle,
block_map_handle,
max_file_cache_size,
} = self;
shared.global_lw_lsn.store(lsn.0, Ordering::Relaxed);
@@ -170,9 +176,11 @@ impl<'t> IntegratedCacheInitStruct<'t> {
shared,
relsize_cache: relsize_cache_handle.attach_writer(),
block_map: block_map_handle.attach_writer().into(),
resize_mutex: std::sync::Mutex::new(()),
file_cache,
clock_hand: AtomicUsize::new(0),
metrics: IntegratedCacheMetricGroup::new(),
max_file_cache_size,
}
}
@@ -182,6 +190,7 @@ impl<'t> IntegratedCacheInitStruct<'t> {
shared,
relsize_cache_handle,
block_map_handle,
..
} = self;
IntegratedCacheReadAccess {
@@ -759,127 +768,143 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
/// Resize the local file cache.
pub fn resize_file_cache(&'static self, num_blocks: u32) {
// let _lock = self.resize_mutex.lock().unwrap();
tracing::warn!("\n START OP ");
// TODO(quantumish): unclear what the semantics of this entire operation is
// if there is no file cache.
let file_cache = self.file_cache.as_ref().unwrap();
let old_num_blocks = self.block_map.get_num_buckets() as u32;
tracing::error!("trying to resize cache to {num_blocks} blocks");
let difference = old_num_blocks.abs_diff(num_blocks);
if old_num_blocks < num_blocks {
if let Err(err) = self.block_map.grow(num_blocks) {
tracing::error!(
"could not grow file cache to {} blocks (old size {}): {}",
num_blocks,
old_num_blocks,
err
);
}
let remaining = file_cache.undelete_blocks(difference as u64);
file_cache.grow(remaining);
debug_assert!(file_cache.free_space() > remaining);
} else {
let page_evictions = &self.metrics.cache_page_evictions_counter;
let global_lw_lsn = &self.shared.global_lw_lsn;
let block_map = self.block_map.clone();
tokio::task::spawn_blocking(move || {
block_map.begin_shrink(num_blocks);
let mut old_hand = self.clock_hand.load(Ordering::Relaxed);
if old_hand > num_blocks as usize {
loop {
match self.clock_hand.compare_exchange_weak(
old_hand, 0, Ordering::Relaxed, Ordering::Relaxed
) {
Ok(_) => break,
Err(x) => old_hand = x,
}
}
}
// Try and evict everything in to-be-shrinked space
// TODO(quantumish): consider moving things ahead of clock hand?
let mut file_evictions = 0;
for i in num_blocks..old_num_blocks {
let Some(entry) = block_map.entry_at_bucket(i as usize) else {
continue;
};
let old = entry.get();
if old.pinned.load(Ordering::Relaxed) != 0 {
if num_blocks > self.max_file_cache_size {
tracing::warn!(
"could not shrink file cache to {} blocks (old size {}): entry {} is pinned",
num_blocks,
old_num_blocks,
i
"requested LFC size increase ({num_blocks}) of exceeds max size of hashmap ({})!",
self.max_file_cache_size
);
continue;
}
_ = global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
entry.remove();
if cache_block != INVALID_CACHE_BLOCK {
file_cache.delete_block(cache_block);
file_evictions += 1;
let num_blocks = num_blocks.min(self.max_file_cache_size);
let old_num_blocks = file_cache.size.load(Ordering::Relaxed) as u32;
if old_num_blocks != self.block_map.get_num_buckets() as u32 {
tracing::error!(
"LFC and hashmap disagree: {old_num_blocks} LFC blocks yet only {} hashmap entries",
self.block_map.get_num_buckets()
);
}
// TODO(quantumish): is this expected behavior?
page_evictions.inc();
}
let difference = old_num_blocks.abs_diff(num_blocks);
// We want to quickly clear space in the LFC. Regression tests expect to see
// an immediate-ish change in the file size, so we evict other entries to reclaim
// enough space. Waiting for stragglers at the end of the map could *in theory*
// take indefinite amounts of time depending on how long they stay pinned.
while file_evictions < difference {
if old_num_blocks < num_blocks {
let (sz, blks) = file_cache.num_allocated_blocks();
tracing::warn!("size originally ({sz}, {blks}) ({} blocks)", blks/8);
tracing::warn!("max written: {}", file_cache.max_written.load(Ordering::Relaxed));
tracing::warn!("trying to grow to {} blocks from {}", num_blocks, old_num_blocks);
tracing::warn!("growing map to {} from {}", num_blocks, old_num_blocks);
tracing::warn!("growing table to {} from {}",
num_blocks.max(self.block_map.get_num_buckets() as u32),
self.block_map.get_num_buckets());
// if let Err(err) = self.block_map.grow(num_blocks.max(self.block_map.get_num_buckets() as u32)) {
// tracing::error!(
// "could not grow file cache to {} blocks (old size {}): {}",
// num_blocks,
// old_num_blocks,
// err
// );
// }
let remaining = file_cache.undelete_blocks(difference as u64);
if remaining > 0 {
let (sz, blks) = file_cache.num_allocated_blocks();
tracing::warn!("undeleted {} blocks", difference as u64 - remaining);
tracing::warn!("size after undelete ({sz}, {blks}) ({} blocks)", blks/8);
tracing::warn!("max written: {}", file_cache.max_written.load(Ordering::Relaxed));
tracing::warn!("growing to create {remaining} more blocks");
file_cache.grow(remaining as u64);
let (sz, blks) = file_cache.num_allocated_blocks();
tracing::warn!("size after max_block bump ({sz}, {blks}) ({} blocks)", blks/8);
tracing::warn!("max written: {}", file_cache.max_written.load(Ordering::Relaxed));
}
debug_assert!(file_cache.free_space() > remaining);
} else if old_num_blocks > num_blocks {
tracing::warn!("beginning table shrink to {num_blocks} from {old_num_blocks}");
// self.block_map.begin_shrink((num_blocks).min(self.block_map.get_num_buckets() as u32));
// tracing::error!("made it past table shrink");
// let mut old_hand = self.clock_hand.load(Ordering::Relaxed);
// if old_hand > num_blocks as usize {
// loop {
// match self.clock_hand.compare_exchange_weak(
// old_hand, 0, Ordering::Relaxed, Ordering::Relaxed
// ) {
// Ok(_) => break,
// Err(x) => old_hand = x,
// }
// }
// }
// tracing::error!("about to check for pins");
// 'outer: for i in num_blocks..old_num_blocks {
// loop {
// let Some(entry) = self.block_map.entry_at_bucket(i as usize) else {
// continue 'outer;
// };
// let old = entry.get();
// if old.pinned.load(Ordering::Relaxed) == 0 {
// let old_val = entry.remove();
// // let _ = self
// // .shared
// // .global_lw_lsn
// // .fetch_max(old_val.lw_lsn.into_inner().0, Ordering::Relaxed);
// // let cache_block = old_val.cache_block.into_inner();
// // if cache_block != INVALID_CACHE_BLOCK {
// // file_cache.delete_block(cache_block);
// // file_evictions += 1;
// // self.metrics.cache_page_evictions_counter.inc();
// // }
// continue 'outer;
// }
// drop(entry);
// // Not great...
// std::thread::sleep(std::time::Duration::from_millis(1));
// continue;
// // TODO(quantumish): is this expected behavior?
// }
// }
// tracing::info!("removed {file_evictions} blocks from end");
let mut file_evictions = 0;
let (sz, blks) = file_cache.num_allocated_blocks();
tracing::warn!("size originally ({sz}, {blks}) ({} blocks)", blks/8);
tracing::warn!("trying to shrink to {} blocks from {} ({} in use)",
num_blocks, old_num_blocks, self.block_map.get_num_buckets_in_use());
let mut remaining = file_cache.reclaim_blocks(difference as u64 - file_evictions);
tracing::warn!("reclaimed {file_evictions} blocks");
tracing::warn!("size before evictions: {}. {file_evictions} evictions done (out of {difference}", file_cache.size.load(Ordering::Relaxed));
let old = file_evictions;
let mut evictions = 0;
while remaining > 0 as u64 {
if let Some(i) = self.try_evict_cache_block() {
evictions += 1;
if i != INVALID_CACHE_BLOCK {
file_cache.delete_block(i);
file_evictions += 1;
remaining -= 1;
}
}
}
tracing::warn!("evicted {evictions} map entries, {old} file entries");
let (sz, blks) = file_cache.num_allocated_blocks();
tracing::warn!("size now ({sz}, {blks}) (should be {} blocks, is actually {} blocks)",
file_cache.size.load(Ordering::Relaxed), blks/8);
// Try again at evicting entries in to-be-shrunk region, except don't give up this time.
// Not a great solution all around: unnecessary scanning, spinning, and code duplication.
// Not sure what a good alternative is though, as there may be enough of these entries that
// we can't store a Vec of them and we ultimately can't proceed until they're evicted.
// Maybe a notification system for unpinning somehow? This also makes me think that pinning
// of entries should be a first class concept within the hashmap implementation...
'outer: for i in num_blocks..old_num_blocks {
loop {
let Some(entry) = block_map.entry_at_bucket(i as usize) else {
continue 'outer;
};
let old = entry.get();
if old.pinned.load(Ordering::Relaxed) != 0 {
drop(entry);
// Painful...
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
_ = global_lw_lsn.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
entry.remove();
if cache_block != INVALID_CACHE_BLOCK {
file_cache.delete_block(cache_block);
}
// TODO(quantumish): is this expected behavior?
page_evictions.inc();
continue 'outer;
}
}
if let Err(err) = self.block_map.finish_shrink() {
tracing::warn!(
"could not shrink file cache to {} blocks (old size {}): {}",
num_blocks,
old_num_blocks,
err
);
}
});
// // self.block_map.begin_shrink(u32::MAX);
// if let Err(err) = self.block_map.finish_shrink() {
// tracing::warn!(
// "could not shrink file cache to {} blocks (old size {}): {}",
// num_blocks,
// old_num_blocks,
// err
// );
// }
}
tracing::warn!("\n END OP ");
}
pub fn dump_map(&self, _dst: &mut dyn std::io::Write) {
@@ -1007,6 +1032,7 @@ impl<'t> IntegratedCacheReadAccess<'t> {
//
// TODO: Evict something. But for now, just set the global lw LSN instead.
// That's correct, but not very efficient for future reads
tracing::warn!("hash table is full");
let _ = self
.shared
.global_lw_lsn

View File

@@ -80,9 +80,9 @@ where
break;
}
};
tracing::info!("waiting for conflicting IO {request_id} to complete");
// tracing::info!("waiting for conflicting IO {request_id} to complete");
let _ = lock.lock().await;
tracing::info!("conflicting IO {request_id} completed");
// tracing::info!("conflicting IO {request_id} completed");
}
MutexHashMapGuard {

View File

@@ -37,7 +37,7 @@ use utils::lsn::Lsn;
pub struct CommunicatorWorkerProcessStruct<'a> {
/// Tokio runtime that the main loop and any other related tasks runs in.
runtime: tokio::runtime::Runtime,
pub(crate) runtime: tokio::runtime::Runtime,
/// Client to communicate with the pageserver
client: PageserverClient,

View File

@@ -158,7 +158,9 @@ pub extern "C" fn communicator_worker_config_reload(
nshards: u32,
stripe_size: u32,
) {
proc_handle.runtime.spawn_blocking(move || {
proc_handle.cache.resize_file_cache(file_cache_size as u32);
});
let shard_map = shard_map_to_hash(nshards, shard_map);
let stripe_size = (nshards > 1).then_some(ShardStripeSize(stripe_size));

View File

@@ -35,8 +35,23 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
n_resize = 10
scale = 20
def get_lfc_size() -> tuple[int, int]:
lfc_file_path = endpoint.lfc_path()
lfc_file_size = lfc_file_path.stat().st_size
res = subprocess.run(
["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True
)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
log.info(f"Size of LFC file {lfc_file_size / (1024 * 1024)}MB, alloc'd KB {lfc_file_blocks}, blocks {int(lfc_file_blocks)/8}")
return (lfc_file_size, lfc_file_blocks)
log.info("Original LFC size")
get_lfc_size()
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
get_lfc_size()
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr])
@@ -51,17 +66,6 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
cur.execute("create extension neon")
def get_lfc_size() -> tuple[int, int]:
lfc_file_path = endpoint.lfc_path()
lfc_file_size = lfc_file_path.stat().st_size
res = subprocess.run(
["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True
)
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}")
return (lfc_file_size, lfc_file_blocks)
# For as long as pgbench is running, twiddle the LFC size once a second.
# Note that we launch this immediately, already while the "pgbench -i"
# initialization step is still running. That's quite a different workload
@@ -72,11 +76,14 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
# is really doing something.
size = random.randint(192, 512)
cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'")
log.info(f"alter system set neon.file_cache_size_limit='{size}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
get_lfc_size()
thread.join()
log.info("Running seqscan.")
# Fill LFC: seqscan should fetch the whole table in cache.
# It is needed for further correct evaluation of LFC file size
# (a sparse chunk of LFC takes less than 1 MB on disk).
@@ -86,6 +93,13 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
(lfc_file_size, lfc_file_blocks) = get_lfc_size()
assert int(lfc_file_blocks) > 128 * 1024
time.sleep(2)
cur.execute("select count(*) from local_cache")
log.info(f"local_cache size: {cur.fetchall()[0][0]}")
log.info("Beginning actual shrink.")
# At the end, set it at 100 MB, and perform a final check that the disk usage
# of the file is in that ballbark.
#
@@ -124,4 +138,4 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
nretries = nretries - 1
time.sleep(1)
assert local_cache_size == used_pages
# assert local_cache_size == used_pages