Support compression of get_page responses

This commit is contained in:
Konstantin Knizhnik
2023-12-07 18:19:48 +02:00
parent 56ddf8e37f
commit 4cfa2fdca5
5 changed files with 81 additions and 4 deletions

View File

@@ -757,6 +757,7 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetCompressedPage(PagestreamGetPageResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -996,6 +997,12 @@ impl PagestreamBeMessage {
bytes.put(&resp.page[..]);
}
Self::GetCompressedPage(resp) => {
bytes.put_u8(105); /* tag from pagestore_client.h */
bytes.put_u16(resp.page.len() as u16);
bytes.put(&resp.page[..]);
}
Self::Error(resp) => {
bytes.put_u8(Tag::Error as u8);
bytes.put(resp.message.as_bytes());

View File

@@ -1155,9 +1155,18 @@ impl PageServerHandler {
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
let compressed = lz4_flex::block::compress(&page);
if compressed.len() < page.len() {
Ok(PagestreamBeMessage::GetCompressedPage(
PagestreamGetPageResponse {
page: Bytes::from(compressed),
},
))
} else {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
}))
}
}
#[instrument(skip_all, fields(shard_id))]

View File

@@ -18,7 +18,7 @@ OBJS = \
PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
SHLIB_LINK = -lcurl -llz4
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql

View File

@@ -44,6 +44,7 @@ typedef enum
T_NeonErrorResponse,
T_NeonDbSizeResponse,
T_NeonGetSlruSegmentResponse,
T_NeonGetCompressedPageResponse
} NeonMessageTag;
/* base struct for c-style inheritance */
@@ -144,6 +145,15 @@ typedef struct
#define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ))
typedef struct
{
NeonMessageTag tag;
uint16 compressed_size;
char page[FLEXIBLE_ARRAY_MEMBER];
} NeonGetCompressedPageResponse;
#define PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressded_size) (MAXALIGN(offsetof(NeonGetCompressedPageResponse, page) + compressed_size))
typedef struct
{
NeonMessageTag tag;

View File

@@ -45,6 +45,10 @@
*/
#include "postgres.h"
#ifdef USE_LZ4
#include <lz4.h>
#endif
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xlogdefs.h"
@@ -1059,6 +1063,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonExistsResponse:
case T_NeonNblocksResponse:
case T_NeonGetPageResponse:
case T_NeonGetCompressedPageResponse:
case T_NeonErrorResponse:
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
@@ -1114,6 +1119,21 @@ nm_unpack_response(StringInfo s)
Assert(msg_resp->tag == T_NeonGetPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
case T_NeonGetCompressedPageResponse:
{
NeonGetCompressedPageResponse *msg_resp;
uint16 compressed_size = pq_getmsgint(s, 2);
msg_resp = palloc0(PS_GETCOMPRESSEDPAGERESPONSE_SIZE(compressed_size));
msg_resp->tag = tag;
msg_resp->compressed_size = compressed_size;
memcpy(msg_resp->page, pq_getmsgbytes(s, compressed_size), compressed_size);
pq_getmsgend(s);
Assert(msg_resp->tag == T_NeonGetCompressedPageResponse);
resp = (NeonResponse *) msg_resp;
break;
}
@@ -1287,6 +1307,14 @@ nm_to_string(NeonMessage *msg)
appendStringInfoChar(&s, '}');
break;
}
case T_NeonGetCompressedPageResponse:
{
NeonGetCompressedPageResponse *msg_resp = (NeonGetCompressedPageResponse *) msg;
appendStringInfoString(&s, "{\"type\": \"NeonGetCompressedPageResponse\"");
appendStringInfo(&s, ", \"compressed_page_size\": \"%d\"}", msg_resp->compressed_size);
appendStringInfoChar(&s, '}');
break;
}
case T_NeonErrorResponse:
{
NeonErrorResponse *msg_resp = (NeonErrorResponse *) msg;
@@ -2205,6 +2233,29 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
lfc_write(rinfo, forkNum, blkno, buffer);
break;
case T_NeonGetCompressedPageResponse:
{
#ifndef USE_LZ4
ereport(ERROR, \
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
errmsg("compression method lz4 not supported"), \
errdetail("This functionality requires the server to be built with lz4 support."), \
errhint("You need to rebuild PostgreSQL using %s.", "--with-lz4")))
#else
NeonGetCompressedPageResponse* cp = (NeonGetCompressedPageResponse *) resp;
int rc = LZ4_decompress_safe(cp->page,
buffer,
cp->compressed_size,
BLCKSZ);
if (rc != BLCKSZ) {
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg_internal("compressed lz4 data is corrupt")));
}
lfc_write(rinfo, forkNum, blkno, buffer);
#endif
break;
}
case T_NeonErrorResponse:
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),