mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 21:40:39 +00:00
Reduce number of flushes in PostgresBackend by delaying flush till the lst prefetch request
This commit is contained in:
@@ -300,7 +300,7 @@ impl PageServerHandler {
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?;
|
||||
|
||||
let mut do_flush = true;
|
||||
let response = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let _timer = metrics.get_rel_exists.start_timer();
|
||||
@@ -312,6 +312,7 @@ impl PageServerHandler {
|
||||
}
|
||||
PagestreamFeMessage::GetPage(req) => {
|
||||
let _timer = metrics.get_page_at_lsn.start_timer();
|
||||
do_flush = !req.prefetch;
|
||||
self.handle_get_page_at_lsn_request(&timeline, &req).await
|
||||
}
|
||||
PagestreamFeMessage::DbSize(req) => {
|
||||
@@ -330,7 +331,9 @@ impl PageServerHandler {
|
||||
});
|
||||
|
||||
pgb.write_message(&BeMessage::CopyData(&response.serialize()))?;
|
||||
pgb.flush().await?;
|
||||
if do_flush {
|
||||
pgb.flush().await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -60,6 +60,7 @@ typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
bool latest; /* if true, request latest page version */
|
||||
bool prefetch; /* if true, then request is followed by more prefetch requests */
|
||||
XLogRecPtr lsn; /* request page version @ this LSN */
|
||||
} NeonRequest;
|
||||
|
||||
|
||||
@@ -211,6 +211,7 @@ nm_pack_request(NeonRequest * msg)
|
||||
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendbyte(&s, msg_req->req.prefetch);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, msg_req->rnode.spcNode);
|
||||
pq_sendint32(&s, msg_req->rnode.dbNode);
|
||||
@@ -383,6 +384,7 @@ nm_to_string(NeonMessage * msg)
|
||||
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"prefetch\": %d", msg_req->req.prefetch);
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1141,6 +1143,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = request_latest,
|
||||
.req.prefetch = false,
|
||||
.req.lsn = request_lsn,
|
||||
.rnode = rnode,
|
||||
.forknum = forkNum,
|
||||
@@ -1150,6 +1153,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
if (n_prefetch_requests > 0)
|
||||
{
|
||||
/* Combine all prefetch requests with primary request */
|
||||
request.req.prefetch = true;
|
||||
page_server->send((NeonRequest *) & request);
|
||||
for (i = 0; i < n_prefetch_requests; i++)
|
||||
{
|
||||
@@ -1157,6 +1161,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
request.forknum = prefetch_requests[i].forkNum;
|
||||
request.blkno = prefetch_requests[i].blockNum;
|
||||
prefetch_responses[i] = prefetch_requests[i];
|
||||
request.req.prefetch = i+1 < n_prefetch_requests;
|
||||
page_server->send((NeonRequest *) & request);
|
||||
}
|
||||
page_server->flush();
|
||||
|
||||
Reference in New Issue
Block a user