From ab7a4d85744c21df69de9bedfc2c0b94848052af Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 22 Oct 2022 15:38:09 +0300 Subject: [PATCH] Reduce number of flushes in PostgresBackend by delaying flush till the lst prefetch request --- pageserver/src/page_service.rs | 7 +++++-- pgxn/neon/pagestore_client.h | 1 + pgxn/neon/pagestore_smgr.c | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index aec91bc7f1..d139c9dedd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -300,7 +300,7 @@ impl PageServerHandler { trace!("query: {copy_data_bytes:?}"); let neon_fe_msg = PagestreamFeMessage::parse(copy_data_bytes)?; - + let mut do_flush = true; let response = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { let _timer = metrics.get_rel_exists.start_timer(); @@ -312,6 +312,7 @@ impl PageServerHandler { } PagestreamFeMessage::GetPage(req) => { let _timer = metrics.get_page_at_lsn.start_timer(); + do_flush = !req.prefetch; self.handle_get_page_at_lsn_request(&timeline, &req).await } PagestreamFeMessage::DbSize(req) => { @@ -330,7 +331,9 @@ impl PageServerHandler { }); pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; - pgb.flush().await?; + if do_flush { + pgb.flush().await?; + } } Ok(()) } diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index e0cda11b63..3f6971fff9 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -60,6 +60,7 @@ typedef struct { NeonMessageTag tag; bool latest; /* if true, request latest page version */ + bool prefetch; /* if true, then request is followed by more prefetch requests */ XLogRecPtr lsn; /* request page version @ this LSN */ } NeonRequest; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index f78b1e1cd1..6ed252843d 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -211,6 +211,7 @@ nm_pack_request(NeonRequest * msg) NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg; pq_sendbyte(&s, msg_req->req.latest); + pq_sendbyte(&s, msg_req->req.prefetch); pq_sendint64(&s, msg_req->req.lsn); pq_sendint32(&s, msg_req->rnode.spcNode); pq_sendint32(&s, msg_req->rnode.dbNode); @@ -383,6 +384,7 @@ nm_to_string(NeonMessage * msg) appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno); appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn)); appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest); + appendStringInfo(&s, ", \"prefetch\": %d", msg_req->req.prefetch); appendStringInfoChar(&s, '}'); break; } @@ -1141,6 +1143,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, .req.latest = request_latest, + .req.prefetch = false, .req.lsn = request_lsn, .rnode = rnode, .forknum = forkNum, @@ -1150,6 +1153,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, if (n_prefetch_requests > 0) { /* Combine all prefetch requests with primary request */ + request.req.prefetch = true; page_server->send((NeonRequest *) & request); for (i = 0; i < n_prefetch_requests; i++) { @@ -1157,6 +1161,7 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, request.forknum = prefetch_requests[i].forkNum; request.blkno = prefetch_requests[i].blockNum; prefetch_responses[i] = prefetch_requests[i]; + request.req.prefetch = i+1 < n_prefetch_requests; page_server->send((NeonRequest *) & request); } page_server->flush();