mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
SNAPSHOT from yday, still fails, incorrect LSN and probably the return value
This commit is contained in:
@@ -332,6 +332,13 @@ impl<'t> IntegratedCacheWriteAccess<'t> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the last written LSN.
|
||||
pub fn get_lsn(&'t self) -> Lsn {
|
||||
// TODO: supposedly, this should be the last written LSN, but it is not
|
||||
// , perhaps
|
||||
Lsn(self.global_lw_lsn.load(Ordering::Relaxed))
|
||||
}
|
||||
|
||||
pub fn get_db_size(&'t self, _db_oid: u32) -> CacheResult<u64> {
|
||||
// TODO: it would be nice to cache database sizes too. Getting the database size
|
||||
// is not a very common operation, but when you do it, it's often interactive, with
|
||||
|
||||
@@ -4,7 +4,7 @@ pub type COid = u32;
|
||||
// This conveniently matches PG_IOV_MAX
|
||||
pub const MAX_GETPAGEV_PAGES: usize = 32;
|
||||
|
||||
use pageserver_page_api as page_api;
|
||||
use pageserver_page_api::{self as page_api, SlruKind};
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[repr(C)]
|
||||
@@ -17,6 +17,7 @@ pub enum NeonIORequest {
|
||||
RelExists(CRelExistsRequest),
|
||||
RelSize(CRelSizeRequest),
|
||||
GetPageV(CGetPageVRequest),
|
||||
ReadSlruSegment(CReadSlruSegmentRequest),
|
||||
PrefetchV(CPrefetchVRequest),
|
||||
DbSize(CDbSizeRequest),
|
||||
|
||||
@@ -42,6 +43,9 @@ pub enum NeonIOResult {
|
||||
|
||||
/// the result pages are written to the shared memory addresses given in the request
|
||||
GetPageV,
|
||||
/// The result is written to the shared memory address given in the
|
||||
/// request.
|
||||
ReadSlruSegment,
|
||||
|
||||
/// A prefetch request returns as soon as the request has been received by the communicator.
|
||||
/// It is processed in the background.
|
||||
@@ -67,6 +71,7 @@ impl NeonIORequest {
|
||||
RelExists(req) => req.request_id,
|
||||
RelSize(req) => req.request_id,
|
||||
GetPageV(req) => req.request_id,
|
||||
ReadSlruSegment(req) => req.request_id,
|
||||
PrefetchV(req) => req.request_id,
|
||||
DbSize(req) => req.request_id,
|
||||
WritePage(req) => req.request_id,
|
||||
@@ -174,6 +179,23 @@ pub struct CGetPageVRequest {
|
||||
pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES],
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CReadSlruSegmentRequest {
|
||||
pub request_id: u64,
|
||||
pub slru_kind: SlruKind,
|
||||
pub segment_number: u32,
|
||||
|
||||
// pub spc_oid: COid,
|
||||
// pub db_oid: COid,
|
||||
// pub rel_number: u32,
|
||||
// pub fork_number: u8,
|
||||
// pub block_number: u32,
|
||||
|
||||
// These fields define where the result is written. Must point into a buffer in shared memory!
|
||||
pub dest: ShmemBuf,
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CPrefetchVRequest {
|
||||
|
||||
@@ -58,6 +58,7 @@ pub struct CommunicatorWorkerProcessStruct<'a> {
|
||||
request_rel_exists_counter: IntCounter,
|
||||
request_rel_size_counter: IntCounter,
|
||||
request_get_pagev_counter: IntCounter,
|
||||
request_read_slru_segment_counter: IntCounter,
|
||||
request_prefetchv_counter: IntCounter,
|
||||
request_db_size_counter: IntCounter,
|
||||
request_write_page_counter: IntCounter,
|
||||
@@ -123,6 +124,8 @@ pub(super) async fn init(
|
||||
let request_rel_exists_counter = request_counters.with_label_values(&["rel_exists"]);
|
||||
let request_rel_size_counter = request_counters.with_label_values(&["rel_size"]);
|
||||
let request_get_pagev_counter = request_counters.with_label_values(&["get_pagev"]);
|
||||
let request_read_slru_segment_counter =
|
||||
request_counters.with_label_values(&["read_slru_segment"]);
|
||||
let request_prefetchv_counter = request_counters.with_label_values(&["prefetchv"]);
|
||||
let request_db_size_counter = request_counters.with_label_values(&["db_size"]);
|
||||
let request_write_page_counter = request_counters.with_label_values(&["write_page"]);
|
||||
@@ -173,6 +176,7 @@ pub(super) async fn init(
|
||||
request_rel_exists_counter,
|
||||
request_rel_size_counter,
|
||||
request_get_pagev_counter,
|
||||
request_read_slru_segment_counter,
|
||||
request_prefetchv_counter,
|
||||
request_db_size_counter,
|
||||
request_write_page_counter,
|
||||
@@ -418,6 +422,36 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
Err(errno) => NeonIOResult::Error(errno),
|
||||
}
|
||||
}
|
||||
NeonIORequest::ReadSlruSegment(req) => {
|
||||
self.request_read_slru_segment_counter.inc();
|
||||
let lsn = self.cache.get_lsn();
|
||||
|
||||
match self
|
||||
.client
|
||||
.get_slru_segment(page_api::GetSlruSegmentRequest {
|
||||
read_lsn: self.request_lsns(lsn),
|
||||
kind: req.slru_kind,
|
||||
segno: req.segment_number,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(slru_bytes) => {
|
||||
let src: &[u8] = &slru_bytes.as_ref();
|
||||
let dest = req.dest;
|
||||
let len = std::cmp::min(src.len(), dest.bytes_total());
|
||||
|
||||
unsafe {
|
||||
std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len);
|
||||
};
|
||||
|
||||
NeonIOResult::ReadSlruSegment
|
||||
}
|
||||
Err(err) => {
|
||||
info!("tonic error: {err:?}");
|
||||
NeonIOResult::Error(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
NeonIORequest::PrefetchV(req) => {
|
||||
self.request_prefetchv_counter.inc();
|
||||
self.request_prefetchv_nblocks_counter
|
||||
|
||||
@@ -997,10 +997,36 @@ communicator_new_dbsize(Oid dbNode)
|
||||
}
|
||||
|
||||
int
|
||||
communicator_new_read_slru_segment(SlruKind kind, int64 segno, void *buffer)
|
||||
communicator_new_read_slru_segment(SlruKind kind, uint32_t segno, void *buffer)
|
||||
{
|
||||
/* TODO */
|
||||
elog(ERROR, "not implemented");
|
||||
NeonIOResult result = {};
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_ReadSlruSegment,
|
||||
.read_slru_segment = {
|
||||
.request_id = assign_request_id(),
|
||||
.slru_kind = kind,
|
||||
.segment_number = segno,
|
||||
.dest.ptr = buffer,
|
||||
}
|
||||
};
|
||||
|
||||
elog(DEBUG5, "readslrusegment called for kind=%u, segno=%u", kind, segno);
|
||||
|
||||
perform_request(&request, &result);
|
||||
switch (result.tag)
|
||||
{
|
||||
case NeonIOResult_ReadSlruSegment:
|
||||
return 0;
|
||||
case NeonIOResult_Error:
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read slru segment, kind=%u, segno=%u: %s",
|
||||
kind, segno, pg_strerror(result.error))));
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected result for read SLRU operation: %d", result.tag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Write requests */
|
||||
|
||||
@@ -38,7 +38,7 @@ extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkN
|
||||
BlockNumber nblocks);
|
||||
extern bool communicator_new_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blockno);
|
||||
extern int communicator_new_read_slru_segment(SlruKind kind, int64 segno,
|
||||
extern int communicator_new_read_slru_segment(SlruKind kind, uint32_t segno,
|
||||
void *buffer);
|
||||
|
||||
/* Write requests, to keep the caches up-to-date */
|
||||
|
||||
@@ -1276,7 +1276,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
tag.dbOid = reln->smgr_rlocator.locator.dbOid;
|
||||
tag.relNumber = reln->smgr_rlocator.locator.relNumber;
|
||||
tag.forkNum = forknum;
|
||||
|
||||
|
||||
while (nblocks > 0)
|
||||
{
|
||||
int iterblocks = Min(nblocks, PG_IOV_MAX);
|
||||
@@ -1664,7 +1664,7 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
{
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
|
||||
|
||||
prefetch_result = communicator_prefetch_lookupv(InfoFromSMgrRel(reln), forknum,
|
||||
blocknum, request_lsns, nblocks,
|
||||
buffers, read_pages);
|
||||
@@ -2385,7 +2385,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
request_lsns.effective_request_lsn = request_lsn;
|
||||
|
||||
if (neon_enable_new_communicator)
|
||||
n_blocks = communicator_new_read_slru_segment(kind, segno, buffer);
|
||||
n_blocks = communicator_new_read_slru_segment(kind, (uint32_t)segno, buffer);
|
||||
else
|
||||
n_blocks = communicator_read_slru_segment(kind, segno, &request_lsns, buffer);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user