mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Fix eviction
This commit is contained in:
@@ -477,7 +477,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
// We can assume that it doesn't already exist, because the
|
||||
// caller is assumed to have already checked it, and holds
|
||||
// the io-in-progress lock. (The BlockEntry might exist, but no cache block)
|
||||
|
||||
|
||||
// Allocate a new block first
|
||||
let cache_block = {
|
||||
loop {
|
||||
@@ -543,7 +543,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
// Maintenance routines
|
||||
|
||||
/// Evict one block from the file cache. This is used when the file cache fills up
|
||||
/// Returns the evicted block, it's not put to the fre list, so it's available for the
|
||||
/// Returns the evicted block. It's not put to the free list, so it's available for the
|
||||
/// caller to use immediately.
|
||||
pub fn try_evict_one_cache_block(&self) -> Option<CacheBlock> {
|
||||
let mut clock_hand = self.clock_hand.lock().unwrap();
|
||||
@@ -583,11 +583,10 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
}
|
||||
|
||||
let _ = self
|
||||
.global_lw_lsn
|
||||
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
|
||||
let cache_block = old.cache_block.load(Ordering::Relaxed);
|
||||
.global_lw_lsn
|
||||
.fetch_max(old.lw_lsn.load().0, Ordering::Relaxed);
|
||||
let cache_block = old.cache_block.swap(INVALID_CACHE_BLOCK, Ordering::Relaxed);
|
||||
if cache_block != INVALID_CACHE_BLOCK {
|
||||
self.page_evictions_counter.inc();
|
||||
evicted_cache_block = Some(cache_block);
|
||||
}
|
||||
// TODO: we don't evict the entry, just the block. Does it make
|
||||
@@ -597,6 +596,7 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
}
|
||||
});
|
||||
if evicted_cache_block.is_some() {
|
||||
self.page_evictions_counter.inc();
|
||||
return evicted_cache_block;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,8 +35,6 @@ impl<'a> CommunicatorWorkerProcessStruct<'a> {
|
||||
|
||||
/// Expose Prometheus metrics.
|
||||
async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'static>>) -> Response {
|
||||
tracing::warn!("get_metrics called");
|
||||
|
||||
use metrics::core::Collector;
|
||||
let metrics = state.collect();
|
||||
|
||||
@@ -51,8 +49,6 @@ async fn get_metrics(State(state): State<&CommunicatorWorkerProcessStruct<'stati
|
||||
let encoder = TextEncoder::new();
|
||||
let mut buffer = vec![];
|
||||
|
||||
tracing::warn!("get_metrics done");
|
||||
|
||||
if let Err(e) = encoder.encode(&metrics, &mut buffer) {
|
||||
Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
|
||||
@@ -417,7 +417,10 @@ process_inflight_requests(void)
|
||||
|
||||
/* FIXME: log errors */
|
||||
for (int i = 0; i < num_inflight_requests; i++)
|
||||
{
|
||||
elog(DEBUG4, "processing prefetch request with idx %d", inflight_requests[i]);
|
||||
wait_request_completion(inflight_requests[i], &result);
|
||||
}
|
||||
num_inflight_requests = 0;
|
||||
}
|
||||
|
||||
@@ -603,13 +606,14 @@ retry:
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
uint64_t cached_block = cached_result.cache_block_numbers[i];
|
||||
char *buffer = buffers[i];
|
||||
ssize_t bytes_total = 0;
|
||||
|
||||
while (bytes_total < BLCKSZ)
|
||||
{
|
||||
ssize_t nbytes;
|
||||
|
||||
nbytes = FileRead(cache_file, ((char *) buffers[i]) + bytes_total, BLCKSZ - bytes_total, cached_block * BLCKSZ + bytes_total, WAIT_EVENT_NEON_LFC_READ);
|
||||
nbytes = FileRead(cache_file, buffer + bytes_total, BLCKSZ - bytes_total, cached_block * BLCKSZ + bytes_total, WAIT_EVENT_NEON_LFC_READ);
|
||||
if (nbytes == -1)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
|
||||
Reference in New Issue
Block a user