mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 20:12:54 +00:00
Use 1Mb chunk instead of page for loading data from pageserver
This commit is contained in:
@@ -294,6 +294,7 @@ impl PostgresNode {
|
||||
conf.append("max_replication_slots", "10");
|
||||
conf.append("hot_standby", "on");
|
||||
conf.append("shared_buffers", "1MB");
|
||||
conf.append("zenith.file_cache_size", "4096");
|
||||
conf.append("fsync", "off");
|
||||
conf.append("max_connections", "100");
|
||||
conf.append("wal_level", "replica");
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use postgres_ffi::pg_constants::BLCKSZ;
|
||||
use regex::Regex;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
@@ -42,6 +43,8 @@ use crate::tenant_mgr;
|
||||
use crate::walreceiver;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
const CHUNK_SIZE: u32 = 128; // 1Mb
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
enum PagestreamFeMessage {
|
||||
Exists(PagestreamExistsRequest),
|
||||
@@ -91,7 +94,8 @@ struct PagestreamNblocksResponse {
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PagestreamGetPageResponse {
|
||||
page: Bytes,
|
||||
n_blocks: u32,
|
||||
data: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -162,7 +166,8 @@ impl PagestreamBeMessage {
|
||||
|
||||
Self::GetPage(resp) => {
|
||||
bytes.put_u8(102); /* tag from pagestore_client.h */
|
||||
bytes.put(&resp.page[..]);
|
||||
bytes.put_u32(resp.n_blocks);
|
||||
bytes.put(&resp.data[..]);
|
||||
}
|
||||
|
||||
Self::Error(resp) => {
|
||||
@@ -438,11 +443,18 @@ impl PageServerHandler {
|
||||
.entered();
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||
|
||||
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
|
||||
|
||||
let rel_size = timeline.get_relish_size(tag, lsn)?.unwrap_or(0);
|
||||
let blkno = req.blkno;
|
||||
let n_blocks = u32::min(blkno + CHUNK_SIZE, rel_size) - blkno;
|
||||
let mut data = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize);
|
||||
for i in 0..n_blocks {
|
||||
let page = timeline.get_page_at_lsn(tag, blkno + i, lsn)?;
|
||||
data.extend_from_slice(&page);
|
||||
}
|
||||
assert!(data.len() == n_blocks as usize * BLCKSZ as usize);
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
n_blocks,
|
||||
data: data.freeze(),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: 3b166d06cf...6309cf1b52
Reference in New Issue
Block a user