Implement autoprewarming in Neon uig pg_prewarm extension

This commit is contained in:
Konstantin Knizhnik
2022-11-02 10:33:12 +02:00
parent 144dd97025
commit 441dc1be2f
10 changed files with 114 additions and 10 deletions

View File

@@ -319,7 +319,7 @@ impl PostgresNode {
// uses only needed variables namely host, port, user, password.
format!("postgresql://no_user:{password}@{host}:{port}")
};
conf.append("shared_preload_libraries", "neon");
conf.append("shared_preload_libraries", "neon,pg_prewarm");
conf.append_line("");
conf.append("neon.pageserver_connstring", &pageserver_connstr);
conf.append("neon.tenant_id", &self.tenant_id.to_string());

View File

@@ -230,6 +230,7 @@ pub enum PagestreamFeMessage {
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
Fcntl(PagestreamFcntlRequest),
}
// Wrapped in libpq CopyData
@@ -270,6 +271,12 @@ pub struct PagestreamDbSizeRequest {
pub dbnode: u32,
}
#[derive(Debug)]
pub struct PagestreamFcntlRequest {
pub cmd: u32,
pub data: Bytes,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -341,6 +348,14 @@ impl PagestreamFeMessage {
lsn: Lsn::from(body.get_u64()),
dbnode: body.get_u32(),
})),
4 => {
let cmd = body.get_u32();
let size = body.get_u32() as usize;
Ok(PagestreamFeMessage::Fcntl(PagestreamFcntlRequest {
cmd,
data: body.copy_to_bytes(size),
}))
}
_ => bail!("unknown smgr message tag: {},'{:?}'", msg_tag, body),
}
}

View File

@@ -232,3 +232,6 @@ pub const PGDATA_SPECIAL_FILES: [&str; 3] =
["pg_hba.conf", "pg_ident.conf", "postgresql.auto.conf"];
pub static PG_HBA: &str = include_str!("../samples/pg_hba.conf");
pub static AUTOPREWARM_FILE_NAME: &str = "autoprewarm.blocks";
pub const SMGR_FCNTL_CACHE_SNAPSHOT: u32 = 1;

View File

@@ -17,6 +17,7 @@ use itertools::Itertools;
use std::fmt::Write as FmtWrite;
use std::io;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
@@ -25,8 +26,10 @@ use tracing::*;
use crate::tenant::Timeline;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{
AUTOPREWARM_FILE_NAME, PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA,
};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI;
@@ -145,6 +148,7 @@ where
self.ar.append(&header, &mut io::empty())?;
}
}
self.add_prewarm_file()?;
// Gather non-relational files from object storage pages.
for kind in [
@@ -218,6 +222,21 @@ where
Ok(())
}
//
// Include "autoprewarm-bin.blocks" in archive (if exists)
//
fn add_prewarm_file(&mut self) -> anyhow::Result<()> {
let path = self
.timeline
.conf
.timeline_path(&self.timeline.timeline_id, &self.timeline.tenant_id)
.join(AUTOPREWARM_FILE_NAME);
if PathBuf::from(&path).exists() {
self.ar.append_path_with_name(path, AUTOPREWARM_FILE_NAME)?;
}
Ok(())
}
//
// Generate SLRU segment files from repository.
//

View File

@@ -15,10 +15,11 @@ use futures::{Stream, StreamExt};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamFcntlRequest, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use std::io;
use std::io::Write;
use std::net::TcpListener;
use std::str;
use std::str::FromStr;
@@ -45,9 +46,12 @@ use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::virtual_file::VirtualFile;
use crate::CheckpointConfig;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use crate::TEMP_FILE_SUFFIX;
use postgres_ffi::pg_constants::{
AUTOPREWARM_FILE_NAME, DEFAULTTABLESPACE_OID, SMGR_FCNTL_CACHE_SNAPSHOT,
};
use postgres_ffi::BLCKSZ;
fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream<Item = io::Result<Bytes>> + '_ {
@@ -317,6 +321,10 @@ impl PageServerHandler {
let _timer = metrics.get_db_size.start_timer();
self.handle_db_size_request(&timeline, &req).await
}
PagestreamFeMessage::Fcntl(req) => {
self.handle_fcntl_request(&timeline, &req).await?;
continue;
}
};
let response = response.unwrap_or_else(|e| {
@@ -586,6 +594,33 @@ impl PageServerHandler {
}))
}
async fn handle_fcntl_request(
&self,
timeline: &Timeline,
req: &PagestreamFcntlRequest,
) -> Result<()> {
if req.cmd == SMGR_FCNTL_CACHE_SNAPSHOT {
let temp_path = self
.conf
.timeline_path(&timeline.timeline_id, &timeline.tenant_id)
.join(format!("{AUTOPREWARM_FILE_NAME}.{TEMP_FILE_SUFFIX}"));
let mut file = VirtualFile::open_with_options(
&temp_path,
std::fs::OpenOptions::new().write(true).create_new(true),
)?;
file.write_all(&req.data)?;
drop(file);
let final_path = self
.conf
.timeline_path(&timeline.timeline_id, &timeline.tenant_id)
.join(AUTOPREWARM_FILE_NAME);
std::fs::rename(temp_path, &final_path)?;
} else {
warn!("Fcntl request {} is not supported", req.cmd);
}
Ok(())
}
#[instrument(skip(self, pgb))]
async fn handle_basebackup_request(
&self,

View File

@@ -62,8 +62,8 @@ use crate::{
};
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
pub conf: &'static PageServerConf,
pub tenant_conf: Arc<RwLock<TenantConfOpt>>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,

View File

@@ -32,6 +32,7 @@ typedef enum
T_NeonNblocksRequest,
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonFcntlRequest,
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -91,6 +92,14 @@ typedef struct
BlockNumber blkno;
} NeonGetPageRequest;
typedef struct
{
NeonRequest req;
int cmd;
int size;
char data[1];
} NeonFcntlRequest;
/* supertype of all the Neon*Response structs below */
typedef struct
{

View File

@@ -220,6 +220,15 @@ nm_pack_request(NeonRequest * msg)
break;
}
case T_NeonFcntlRequest:
{
NeonFcntlRequest *msg_req = (NeonFcntlRequest *) msg;
pq_sendint32(&s, msg_req->cmd);
pq_sendint32(&s, msg_req->size);
pq_sendbytes(&s, msg_req->data, msg_req->size);
break;
}
/* pagestore -> pagestore_client. We never need to create these. */
case T_NeonExistsResponse:
@@ -1014,6 +1023,19 @@ neon_prefetch_in_progress(SMgrRelation reln)
return n_prefetch_requests + n_prefetch_responses != 0;
}
void
neon_fcntl(SMgrRelation reln, int cmd, void const* data, size_t size)
{
NeonFcntlRequest* req = (NeonFcntlRequest *)palloc(sizeof(NeonFcntlRequest) + size);
req->req.tag = T_NeonFcntlRequest;
req->cmd = cmd;
req->size = (int)size;
memcpy(req->data, data, size);
page_server->send((NeonRequest*) req);
page_server->flush();
}
/*
* neon_prefetch() -- Initiate asynchronous read of the specified block of a relation
*/
@@ -1827,6 +1849,7 @@ static const struct f_smgr neon_smgr =
.smgr_start_unlogged_build = neon_start_unlogged_build,
.smgr_finish_unlogged_build_phase_1 = neon_finish_unlogged_build_phase_1,
.smgr_end_unlogged_build = neon_end_unlogged_build,
.smgr_fcntl = neon_fcntl
};
const f_smgr *