From 319cd74f734d3a0b0d7c3a2ea216fef4f372bc3e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sun, 11 May 2025 19:34:50 +0300 Subject: [PATCH] Fix eviction --- pgxn/neon/communicator/src/integrated_cache.rs | 12 ++++++------ .../src/worker_process/metrics_exporter.rs | 4 ---- pgxn/neon/communicator_new.c | 6 +++++- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pgxn/neon/communicator/src/integrated_cache.rs b/pgxn/neon/communicator/src/integrated_cache.rs index add2ee7f2e..f08a1793fd 100644 --- a/pgxn/neon/communicator/src/integrated_cache.rs +++ b/pgxn/neon/communicator/src/integrated_cache.rs @@ -477,7 +477,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // We can assume that it doesn't already exist, because the // caller is assumed to have already checked it, and holds // the io-in-progress lock. (The BlockEntry might exist, but no cache block) - + // Allocate a new block first let cache_block = { loop { @@ -543,7 +543,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { // Maintenance routines /// Evict one block from the file cache. This is used when the file cache fills up - /// Returns the evicted block, it's not put to the fre list, so it's available for the + /// Returns the evicted block. It's not put to the free list, so it's available for the /// caller to use immediately. pub fn try_evict_one_cache_block(&self) -> Option { let mut clock_hand = self.clock_hand.lock().unwrap(); @@ -583,11 +583,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } let _ = self - .global_lw_lsn - .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); - let cache_block = old.cache_block.load(Ordering::Relaxed); + .global_lw_lsn + .fetch_max(old.lw_lsn.load().0, Ordering::Relaxed); + let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed); if cache_block != INVALID_CACHE_BLOCK { - self.page_evictions_counter.inc(); evicted_cache_block = Some(cache_block); } // TODO: we don't evict the entry, just the block. Does it make @@ -597,6 +596,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> { } }); if evicted_cache_block.is_some() { + self.page_evictions_counter.inc(); return evicted_cache_block; } } diff --git a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs index d6987978d4..f4de0c0f2d 100644 --- a/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs +++ b/pgxn/neon/communicator/src/worker_process/metrics_exporter.rs @@ -35,8 +35,6 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> { /// Expose Prometheus metrics. async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'static>>) -> Response { - tracing::warn!("get_metrics called"); - use metrics::core::Collector; let metrics = state.collect(); @@ -51,8 +49,6 @@ async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'stati let encoder = TextEncoder::new(); let mut buffer = vec![]; - tracing::warn!("get_metrics done"); - if let Err(e) = encoder.encode(&metrics, &mut buffer) { Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) diff --git a/pgxn/neon/communicator_new.c b/pgxn/neon/communicator_new.c index 475af2fdc9..3be02ad57f 100644 --- a/pgxn/neon/communicator_new.c +++ b/pgxn/neon/communicator_new.c @@ -417,7 +417,10 @@ process_inflight_requests(void) /* FIXME: log errors */ for (int i = 0; i < num_inflight_requests; i++) + { + elog(DEBUG4, "processing prefetch request with idx %d", inflight_requests[i]); wait_request_completion(inflight_requests[i], &result); + } num_inflight_requests = 0; } @@ -603,13 +606,14 @@ retry: for (int i = 0; i < nblocks; i++) { uint64_t cached_block = cached_result.cache_block_numbers[i]; + char *buffer = buffers[i]; ssize_t bytes_total = 0; while (bytes_total < BLCKSZ) { ssize_t nbytes; - nbytes = FileRead(cache_file, ((char *) buffers[i]) + bytes_total, BLCKSZ - bytes_total, cached_block * BLCKSZ + bytes_total, WAIT_EVENT_NEON_LFC_READ); + nbytes = FileRead(cache_file, buffer + bytes_total, BLCKSZ - bytes_total, cached_block * BLCKSZ + bytes_total, WAIT_EVENT_NEON_LFC_READ); if (nbytes == -1) ereport(ERROR, (errcode_for_file_access(),