mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Handle get_raw_page_at_lsn() debugging function properly
This adds a new request type between backend and communicator, to make a getpage request at a given LSN, bypassing the LFC. Only used by the get_raw_page_at_lsn() debugging/testing function.
This commit is contained in:
@@ -36,6 +36,10 @@ pub enum NeonIORequest {
|
||||
PrefetchV(CPrefetchVRequest),
|
||||
DbSize(CDbSizeRequest),
|
||||
|
||||
/// This is like GetPageV, but bypasses the LFC and allows specifiying the
|
||||
/// request LSNs directly. For debugging purposes only.
|
||||
GetPageVUncached(CGetPageVUncachedRequest),
|
||||
|
||||
// Write requests. These are needed to keep the relation size cache and LFC up-to-date.
|
||||
// They are not sent to the pageserver.
|
||||
WritePage(CWritePageRequest),
|
||||
@@ -89,6 +93,7 @@ impl NeonIORequest {
|
||||
Empty => 0,
|
||||
RelSize(req) => req.request_id,
|
||||
GetPageV(req) => req.request_id,
|
||||
GetPageVUncached(req) => req.request_id,
|
||||
ReadSlruSegment(req) => req.request_id,
|
||||
PrefetchV(req) => req.request_id,
|
||||
DbSize(req) => req.request_id,
|
||||
@@ -191,6 +196,24 @@ pub struct CGetPageVRequest {
|
||||
pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES],
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CGetPageVUncachedRequest {
|
||||
pub request_id: u64,
|
||||
pub spc_oid: COid,
|
||||
pub db_oid: COid,
|
||||
pub rel_number: u32,
|
||||
pub fork_number: u8,
|
||||
pub block_number: u32,
|
||||
pub nblocks: u8,
|
||||
|
||||
pub request_lsn: CLsn,
|
||||
pub not_modified_since: CLsn,
|
||||
|
||||
// These fields define where the result is written. Must point into a buffer in shared memory!
|
||||
pub dest: [ShmemBuf; MAX_GETPAGEV_PAGES],
|
||||
}
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct CReadSlruSegmentRequest {
|
||||
@@ -331,6 +354,17 @@ impl CGetPageVRequest {
|
||||
}
|
||||
}
|
||||
|
||||
impl CGetPageVUncachedRequest {
|
||||
pub fn reltag(&self) -> page_api::RelTag {
|
||||
page_api::RelTag {
|
||||
spcnode: self.spc_oid,
|
||||
dbnode: self.db_oid,
|
||||
relnode: self.rel_number,
|
||||
forknum: self.fork_number,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl CPrefetchVRequest {
|
||||
pub fn reltag(&self) -> page_api::RelTag {
|
||||
page_api::RelTag {
|
||||
|
||||
@@ -9,7 +9,7 @@ use crate::file_cache::FileCache;
|
||||
use crate::global_allocator::MyAllocatorCollector;
|
||||
use crate::init::CommunicatorInitStruct;
|
||||
use crate::integrated_cache::{CacheResult, IntegratedCacheWriteAccess};
|
||||
use crate::neon_request::{CGetPageVRequest, CPrefetchVRequest};
|
||||
use crate::neon_request::{CGetPageVRequest, CGetPageVUncachedRequest, CPrefetchVRequest};
|
||||
use crate::neon_request::{INVALID_BLOCK_NUMBER, NeonIORequest, NeonIOResult};
|
||||
use crate::worker_process::control_socket;
|
||||
use crate::worker_process::in_progress_ios::{RequestInProgressKey, RequestInProgressTable};
|
||||
@@ -398,6 +398,12 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
Ok(()) => NeonIOResult::GetPageV,
|
||||
Err(errno) => NeonIOResult::Error(errno),
|
||||
},
|
||||
NeonIORequest::GetPageVUncached(req) => {
|
||||
match self.handle_get_pagev_uncached_request(req).await {
|
||||
Ok(()) => NeonIOResult::GetPageV,
|
||||
Err(errno) => NeonIOResult::Error(errno),
|
||||
}
|
||||
}
|
||||
NeonIORequest::ReadSlruSegment(req) => {
|
||||
let lsn = Lsn(req.request_lsn);
|
||||
let file_path = req.destination_file_path();
|
||||
@@ -659,6 +665,71 @@ impl<'t> CommunicatorWorkerProcessStruct<'t> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Subroutine to handle an GetPageVUncached request.
|
||||
///
|
||||
/// Note: this bypasses the cache, in-progress IO locking, and all other side-effects.
|
||||
/// This request type is only used in tests.
|
||||
async fn handle_get_pagev_uncached_request(
|
||||
&'t self,
|
||||
req: &CGetPageVUncachedRequest,
|
||||
) -> Result<(), i32> {
|
||||
let rel = req.reltag();
|
||||
|
||||
// Construct a pageserver request
|
||||
let block_numbers: Vec<u32> =
|
||||
(req.block_number..(req.block_number + (req.nblocks as u32))).collect();
|
||||
let read_lsn = page_api::ReadLsn {
|
||||
request_lsn: Lsn(req.request_lsn),
|
||||
not_modified_since_lsn: Some(Lsn(req.not_modified_since)),
|
||||
};
|
||||
trace!(
|
||||
"sending (uncached) getpage request for blocks {:?} in rel {:?} lsns {}",
|
||||
block_numbers, rel, read_lsn
|
||||
);
|
||||
match self
|
||||
.client
|
||||
.get_page(page_api::GetPageRequest {
|
||||
request_id: req.request_id.into(),
|
||||
request_class: page_api::GetPageClass::Normal,
|
||||
read_lsn,
|
||||
rel,
|
||||
block_numbers: block_numbers.clone(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
// Write the received page images directly to the shared memory location
|
||||
// that the backend requested.
|
||||
if resp.pages.len() != block_numbers.len() {
|
||||
error!(
|
||||
"received unexpected response with {} page images from pageserver for a request for {} pages",
|
||||
resp.pages.len(),
|
||||
block_numbers.len(),
|
||||
);
|
||||
return Err(-1);
|
||||
}
|
||||
|
||||
trace!(
|
||||
"received getpage response for blocks {:?} in rel {:?} lsns {}",
|
||||
block_numbers, rel, read_lsn
|
||||
);
|
||||
|
||||
for (page, dest) in resp.pages.into_iter().zip(req.dest) {
|
||||
let src: &[u8] = page.image.as_ref();
|
||||
let len = std::cmp::min(src.len(), dest.bytes_total());
|
||||
unsafe {
|
||||
std::ptr::copy_nonoverlapping(src.as_ptr(), dest.as_mut_ptr(), len);
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
info!("tonic error: {err:?}");
|
||||
return Err(-1);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Subroutine to handle a PrefetchV request, since it's a little more complicated than
|
||||
/// others.
|
||||
///
|
||||
|
||||
@@ -635,8 +635,8 @@ communicator_new_rel_exists(NRelFileInfo rinfo, ForkNumber forkNum)
|
||||
* Read N consecutive pages from a relation
|
||||
*/
|
||||
void
|
||||
communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
communicator_new_readv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
{
|
||||
NeonIOResult result;
|
||||
CCachedGetPageVResult cached_result;
|
||||
@@ -698,8 +698,8 @@ communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumbe
|
||||
/* Split the vector-request into single page requests */
|
||||
for (int j = 0; j < nblocks; j++)
|
||||
{
|
||||
communicator_new_read_at_lsnv(rinfo, forkNum, blockno + j,
|
||||
&buffers[j], 1);
|
||||
communicator_new_readv(rinfo, forkNum, blockno + j,
|
||||
&buffers[j], 1);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -790,14 +790,68 @@ retry:
|
||||
if (bounce_buf_used)
|
||||
memcpy(buffers[0], bounce_buf_used, BLCKSZ);
|
||||
return;
|
||||
case NeonIOResult_Error:
|
||||
if (nblocks > 0)
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read block %u in rel %u/%u/%u.%u: %s",
|
||||
blockno, RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error))));
|
||||
else
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read %u blocks at %u in rel %u/%u/%u.%u: %s",
|
||||
nblocks, blockno, RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error))));
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected result for GetPageV operation: %d", result.tag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Read a page at given LSN, bypassing the LFC.
|
||||
*
|
||||
* For tests and debugging purposes only.
|
||||
*/
|
||||
void
|
||||
communicator_new_read_at_lsn_uncached(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno,
|
||||
void *buffer, XLogRecPtr request_lsn, XLogRecPtr not_modified_since)
|
||||
{
|
||||
NeonIOResult result;
|
||||
void *bounce_buf_used;
|
||||
NeonIORequest request = {
|
||||
.tag = NeonIORequest_GetPageVUncached,
|
||||
.get_page_v_uncached = {
|
||||
.request_id = assign_request_id(),
|
||||
.spc_oid = NInfoGetSpcOid(rinfo),
|
||||
.db_oid = NInfoGetDbOid(rinfo),
|
||||
.rel_number = NInfoGetRelNumber(rinfo),
|
||||
.fork_number = forkNum,
|
||||
.block_number = blockno,
|
||||
.nblocks = 1,
|
||||
.request_lsn = request_lsn,
|
||||
.not_modified_since = not_modified_since,
|
||||
}
|
||||
};
|
||||
|
||||
/* This is for tests only and doesn't need to be particularly fast. Always use the bounce buffer for simplicity */
|
||||
request.get_page_v_uncached.dest[0].ptr = bounce_buf_used = bounce_buf();
|
||||
|
||||
/* don't use the specialized bcomm_start_get_page_v_request() function here, because we want to bypass the LFC */
|
||||
perform_request(&request, &result);
|
||||
switch (result.tag)
|
||||
{
|
||||
case NeonIOResult_GetPageV:
|
||||
memcpy(buffer, bounce_buf_used, BLCKSZ);
|
||||
return;
|
||||
case NeonIOResult_Error:
|
||||
ereport(ERROR,
|
||||
(errcode_for_file_access(),
|
||||
errmsg("could not read block %u in rel %u/%u/%u.%u: %s",
|
||||
errmsg("could not read (uncached) block %u in rel %u/%u/%u.%u: %s",
|
||||
blockno, RelFileInfoFmt(rinfo), forkNum, pg_strerror(result.error))));
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected result for GetPage operation: %d", result.tag);
|
||||
elog(ERROR, "unexpected result for GetPageV operation: %d", result.tag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -1215,8 +1269,18 @@ print_neon_io_request(NeonIORequest *request)
|
||||
CGetPageVRequest *r = &request->get_page_v;
|
||||
|
||||
snprintf(buf, sizeof(buf), "GetPageV: req " UINT64_FORMAT " rel %u/%u/%u.%u blks %d-%d",
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks);
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks);
|
||||
return buf;
|
||||
}
|
||||
case NeonIORequest_GetPageVUncached:
|
||||
{
|
||||
CGetPageVUncachedRequest *r = &request->get_page_v_uncached;
|
||||
|
||||
snprintf(buf, sizeof(buf), "GetPageVUncached: req " UINT64_FORMAT " rel %u/%u/%u.%u blk %d request_lsn %X/%X not_modified_since %X/%X",
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number,
|
||||
LSN_FORMAT_ARGS(r->request_lsn), LSN_FORMAT_ARGS(r->not_modified_since));
|
||||
return buf;
|
||||
}
|
||||
case NeonIORequest_ReadSlruSegment:
|
||||
@@ -1236,8 +1300,8 @@ print_neon_io_request(NeonIORequest *request)
|
||||
CPrefetchVRequest *r = &request->prefetch_v;
|
||||
|
||||
snprintf(buf, sizeof(buf), "PrefetchV: req " UINT64_FORMAT " rel %u/%u/%u.%u blks %d-%d",
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks);
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number, r->block_number + r->nblocks);
|
||||
return buf;
|
||||
}
|
||||
case NeonIORequest_DbSize:
|
||||
@@ -1245,7 +1309,7 @@ print_neon_io_request(NeonIORequest *request)
|
||||
CDbSizeRequest *r = &request->db_size;
|
||||
|
||||
snprintf(buf, sizeof(buf), "PrefetchV: req " UINT64_FORMAT " db %u",
|
||||
r->request_id, r->db_oid);
|
||||
r->request_id, r->db_oid);
|
||||
return buf;
|
||||
}
|
||||
case NeonIORequest_WritePage:
|
||||
@@ -1253,9 +1317,9 @@ print_neon_io_request(NeonIORequest *request)
|
||||
CWritePageRequest *r = &request->write_page;
|
||||
|
||||
snprintf(buf, sizeof(buf), "WritePage: req " UINT64_FORMAT " rel %u/%u/%u.%u blk %u lsn %X/%X",
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number,
|
||||
LSN_FORMAT_ARGS(r->lsn));
|
||||
r->request_id,
|
||||
r->spc_oid, r->db_oid, r->rel_number, r->fork_number, r->block_number,
|
||||
LSN_FORMAT_ARGS(r->lsn));
|
||||
return buf;
|
||||
}
|
||||
case NeonIORequest_RelExtend:
|
||||
|
||||
@@ -30,9 +30,11 @@ extern void communicator_new_init(void);
|
||||
extern bool communicator_new_rel_exists(NRelFileInfo rinfo, ForkNumber forkNum);
|
||||
extern BlockNumber communicator_new_rel_nblocks(NRelFileInfo rinfo, ForkNumber forknum);
|
||||
extern int64 communicator_new_dbsize(Oid dbNode);
|
||||
extern void communicator_new_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber base_blockno,
|
||||
void **buffers, BlockNumber nblocks);
|
||||
extern void communicator_new_readv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber base_blockno,
|
||||
void **buffers, BlockNumber nblocks);
|
||||
extern void communicator_new_read_at_lsn_uncached(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blockno,
|
||||
void *buffer, XLogRecPtr request_lsn, XLogRecPtr not_modified_since);
|
||||
extern void communicator_new_prefetch_register_bufferv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blockno,
|
||||
BlockNumber nblocks);
|
||||
|
||||
@@ -1438,7 +1438,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
// FIXME: request_lsns is ignored. That affects the neon_test_utils callers.
|
||||
// Add the capability to specify the LSNs explicitly, for the sake of neon_test_utils ?
|
||||
communicator_new_read_at_lsnv(rinfo, forkNum, blkno, &buffer, 1);
|
||||
communicator_new_read_at_lsn_uncached(rinfo, forkNum, blkno, buffer, request_lsns.request_lsn, request_lsns.not_modified_since);
|
||||
}
|
||||
else
|
||||
communicator_read_at_lsnv(rinfo, forkNum, blkno, &request_lsns, &buffer, 1, NULL);
|
||||
@@ -1569,8 +1569,8 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
|
||||
if (neon_use_communicator_worker)
|
||||
{
|
||||
communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forkNum, blkno,
|
||||
(void *) &buffer, 1);
|
||||
communicator_new_readv(InfoFromSMgrRel(reln), forkNum, blkno,
|
||||
(void *) &buffer, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1685,8 +1685,8 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
|
||||
if (neon_use_communicator_worker)
|
||||
{
|
||||
communicator_new_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
buffers, nblocks);
|
||||
communicator_new_readv(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
buffers, nblocks);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user