From 4cfa2fdca50405c4e7b71762235ee41227071278 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 7 Dec 2023 18:19:48 +0200 Subject: [PATCH] Support compression of get_page responses --- libs/pageserver_api/src/models.rs | 7 +++++ pageserver/src/page_service.rs | 15 +++++++-- pgxn/neon/Makefile | 2 +- pgxn/neon/pagestore_client.h | 10 ++++++ pgxn/neon/pagestore_smgr.c | 51 +++++++++++++++++++++++++++++++ 5 files changed, 81 insertions(+), 4 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index fe5bbd1c06..275c20d19b 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -757,6 +757,7 @@ pub enum PagestreamBeMessage { Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), GetSlruSegment(PagestreamGetSlruSegmentResponse), + GetCompressedPage(PagestreamGetPageResponse), } // Keep in sync with `pagestore_client.h` @@ -996,6 +997,12 @@ impl PagestreamBeMessage { bytes.put(&resp.page[..]); } + Self::GetCompressedPage(resp) => { + bytes.put_u8(105); /* tag from pagestore_client.h */ + bytes.put_u16(resp.page.len() as u16); + bytes.put(&resp.page[..]); + } + Self::Error(resp) => { bytes.put_u8(Tag::Error as u8); bytes.put(resp.message.as_bytes()); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f3ceb7d3e6..f2b299579b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1155,9 +1155,18 @@ impl PageServerHandler { .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx) .await?; - Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { - page, - })) + let compressed = lz4_flex::block::compress(&page); + if compressed.len() < page.len() { + Ok(PagestreamBeMessage::GetCompressedPage( + PagestreamGetPageResponse { + page: Bytes::from(compressed), + }, + )) + } else { + Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { + page, + })) + } } #[instrument(skip_all, fields(shard_id))] diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 0bcb9545a6..1b27c8112d 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -18,7 +18,7 @@ OBJS = \ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) -SHLIB_LINK = -lcurl +SHLIB_LINK = -lcurl -llz4 EXTENSION = neon DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index 2889ffacae..3285820e0a 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -44,6 +44,7 @@ typedef enum T_NeonErrorResponse, T_NeonDbSizeResponse, T_NeonGetSlruSegmentResponse, + T_NeonGetCompressedPageResponse } NeonMessageTag; /* base struct for c-style inheritance */ @@ -144,6 +145,15 @@ typedef struct #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) +typedef struct +{ + NeonMessageTag tag; + uint16 compressed_size; + char page[FLEXIBLE_ARRAY_MEMBER]; +} NeonGetCompressedPageResponse; + +#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size)) + typedef struct { NeonMessageTag tag; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 213e396328..84c68af74c 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -45,6 +45,10 @@ */ #include "postgres.h" +#ifdef USE_LZ4 +#include +#endif + #include "access/xact.h" #include "access/xlog.h" #include "access/xlogdefs.h" @@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg) case T_NeonExistsResponse: case T_NeonNblocksResponse: case T_NeonGetPageResponse: + case T_NeonGetCompressedPageResponse: case T_NeonErrorResponse: case T_NeonDbSizeResponse: case T_NeonGetSlruSegmentResponse: @@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s) Assert(msg_resp->tag == T_NeonGetPageResponse); + resp = (NeonResponse *) msg_resp; + break; + } + case T_NeonGetCompressedPageResponse: + { + NeonGetCompressedPageResponse *msg_resp; + uint16 compressed_size = pq_getmsgint(s, 2); + msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size)); + msg_resp->tag = tag; + msg_resp->compressed_size = compressed_size; + memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size); + pq_getmsgend(s); + + Assert(msg_resp->tag == T_NeonGetCompressedPageResponse); + resp = (NeonResponse *) msg_resp; break; } @@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg) appendStringInfoChar(&s, '}'); break; } + case T_NeonGetCompressedPageResponse: + { + NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg; + appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\""); + appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size); + appendStringInfoChar(&s, '}'); + break; + } case T_NeonErrorResponse: { NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg; @@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, lfc_write(rinfo, forkNum, blkno, buffer); break; + case T_NeonGetCompressedPageResponse: + { +#ifndef USE_LZ4 + ereport(ERROR, \ + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \ + errmsg("compression method lz4 not supported"), \ + errdetail("This functionality requires the server to be built with lz4 support."), \ + errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4"))) +#else + NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp; + int rc = LZ4_decompress_safe(cp->page, + buffer, + cp->compressed_size, + BLCKSZ); + if (rc != BLCKSZ) { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg_internal("compressed lz4 data is corrupt"))); + } + lfc_write(rinfo, forkNum, blkno, buffer); +#endif + break; + } case T_NeonErrorResponse: ereport(ERROR, (errcode(ERRCODE_IO_ERROR),