mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
12 Commits
conrad/rem
...
heikki/hac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b5e3d6d40 | ||
|
|
e086568e21 | ||
|
|
7b818f8d64 | ||
|
|
14fefd261f | ||
|
|
01abd4afc5 | ||
|
|
c8541ad29f | ||
|
|
eaad1db9f0 | ||
|
|
6ddcf68829 | ||
|
|
d701f8285c | ||
|
|
77082a0f63 | ||
|
|
e94acbc816 | ||
|
|
f4150614d0 |
@@ -15,6 +15,10 @@ commands:
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
|
||||
- name: chmod-clockcache_dev
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 777 /dev/clockcache_dev' # FIXME: not very secure
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
|
||||
@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
|
||||
|
||||
// Find the first suitable tag that is not present in the string.
|
||||
// Postgres' max role/DB name length is 63 bytes, so even in the
|
||||
// worst case it won't take long.
|
||||
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
|
||||
// worst case it won't take long. Outer tag is always `tag + "x"`,
|
||||
// so if `tag` is not present in the string, `outer_tag` is not
|
||||
// present in the string either.
|
||||
while self.contains(&tag.to_string()) {
|
||||
tag += "x";
|
||||
outer_tag = tag.clone() + "x";
|
||||
}
|
||||
|
||||
@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
|
||||
("name$$$", ("$x$name$$$$x$", "xx")),
|
||||
("name$$$$", ("$x$name$$$$$x$", "xx")),
|
||||
("name$x$", ("$xx$name$x$$xx$", "xxx")),
|
||||
("x", ("$xx$x$xx$", "xxx")),
|
||||
("xx", ("$xxx$xx$xxx$", "xxxx")),
|
||||
("$x", ("$xx$$x$xx$", "xxx")),
|
||||
("x$", ("$xx$x$$xx$", "xxx")),
|
||||
("$x$", ("$xx$$x$$xx$", "xxx")),
|
||||
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
|
||||
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
|
||||
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
|
||||
];
|
||||
|
||||
for (input, expected) in test_cases {
|
||||
|
||||
@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
|
||||
ScatteredLsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
|
||||
@@ -144,7 +144,7 @@ where
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency: IoConcurrency::spawn_from_conf(
|
||||
timeline.conf,
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
|
||||
@@ -3199,7 +3199,7 @@ async fn list_aux_files(
|
||||
.await?;
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
state.conf,
|
||||
state.conf.get_vectored_concurrent_io,
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
@@ -389,6 +388,7 @@ struct PageServerHandler {
|
||||
timeline_handles: Option<TimelineHandles>,
|
||||
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
conf,
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
@@ -862,6 +861,7 @@ impl PageServerHandler {
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -1623,7 +1623,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.get_vectored_concurrent_io,
|
||||
match self.gate_guard.try_clone() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
|
||||
@@ -586,7 +586,7 @@ impl Timeline {
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -645,7 +645,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -885,7 +885,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
|
||||
@@ -8596,8 +8596,10 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, GetVectoredError> {
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
tline.conf.get_vectored_concurrent_io,
|
||||
tline.gate.enter().unwrap(),
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
|
||||
let mut res = tline
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
@@ -318,11 +318,10 @@ impl IoConcurrency {
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_from_conf(
|
||||
conf: &'static PageServerConf,
|
||||
conf: GetVectoredConcurrentIo,
|
||||
gate_guard: GateGuard,
|
||||
) -> IoConcurrency {
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
let selected = match conf.get_vectored_concurrent_io {
|
||||
let selected = match conf {
|
||||
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
|
||||
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
|
||||
};
|
||||
|
||||
@@ -3530,7 +3530,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self_ref.conf,
|
||||
self_ref.conf.get_vectored_concurrent_io,
|
||||
self_ref
|
||||
.gate
|
||||
.enter()
|
||||
@@ -5559,7 +5559,7 @@ impl Timeline {
|
||||
});
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CreateImageLayersError::Cancelled)?,
|
||||
|
||||
@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.conf.get_vectored_concurrent_io,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include <sys/file.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
@@ -52,6 +53,10 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "communicator.h"
|
||||
|
||||
/* For the kernel module */
|
||||
#include "neon_pagecache.h"
|
||||
#define CLOCKCACHE_DEV_PATH "/dev/clockcache_dev"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
/*
|
||||
@@ -159,6 +164,13 @@ typedef struct FileCacheControl
|
||||
uint64 time_write; /* time spent writing (us) */
|
||||
uint64 resizes; /* number of LFC resizes */
|
||||
uint64 evicted_pages; /* number of evicted pages */
|
||||
|
||||
/* FIXME: should make these atomic, they're not protected by any locks */
|
||||
uint64 kernel_module_read_hits; /* success returns from read ioctl */
|
||||
uint64 kernel_module_read_misses; /* ENOENT returns from read ioctl */
|
||||
uint64 kernel_module_write_hits; /* success returns from write ioctl */
|
||||
uint64 kernel_module_write_misses; /* ENOMEM returns from write ioctl */
|
||||
|
||||
dlist_head lru; /* double linked list for LRU replacement
|
||||
* algorithm */
|
||||
dlist_head holes; /* double linked list of punched holes */
|
||||
@@ -183,6 +195,7 @@ typedef struct FileCacheControl
|
||||
static HTAB *lfc_hash;
|
||||
static int lfc_desc = -1;
|
||||
static LWLockId lfc_lock;
|
||||
|
||||
static int lfc_max_size;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_prewarm_limit;
|
||||
@@ -190,6 +203,8 @@ static int lfc_prewarm_batch;
|
||||
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
|
||||
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
|
||||
static char *lfc_path;
|
||||
static bool lfc_use_kernel_module;
|
||||
|
||||
static uint64 lfc_generation;
|
||||
static FileCacheControl *lfc_ctl;
|
||||
static bool lfc_do_prewarm;
|
||||
@@ -203,6 +218,9 @@ bool lfc_prewarm_update_ws_estimation;
|
||||
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
static int pread_with_ioctl(void *buffer, uint64 blkno);
|
||||
static int pwrite_with_ioctl(const void *buffer, uint64 blkno);
|
||||
|
||||
/*
|
||||
* Close LFC file if opened.
|
||||
* All backends should close their LFC files once LFC is disabled.
|
||||
@@ -251,14 +269,19 @@ lfc_switch_off(void)
|
||||
/*
|
||||
* We need to use unlink to to avoid races in LFC write, because it is not
|
||||
* protected by lock
|
||||
*
|
||||
* FIXME: how to clean up the kernel module device on trouble?
|
||||
*/
|
||||
unlink(lfc_path);
|
||||
if (!lfc_use_kernel_module)
|
||||
{
|
||||
unlink(lfc_path);
|
||||
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
}
|
||||
|
||||
/* Wakeup waiting backends */
|
||||
for (int i = 0; i < N_COND_VARS; i++)
|
||||
@@ -270,7 +293,8 @@ lfc_switch_off(void)
|
||||
static void
|
||||
lfc_disable(char const *op)
|
||||
{
|
||||
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache",
|
||||
op, lfc_use_kernel_module ? CLOCKCACHE_DEV_PATH : lfc_path);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
lfc_switch_off();
|
||||
@@ -301,7 +325,9 @@ lfc_ensure_opened(void)
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc < 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||
lfc_desc = BasicOpenFile(
|
||||
lfc_use_kernel_module ? CLOCKCACHE_DEV_PATH : lfc_path,
|
||||
O_RDWR);
|
||||
|
||||
if (lfc_desc < 0)
|
||||
{
|
||||
@@ -351,10 +377,16 @@ lfc_shmem_startup(void)
|
||||
initSHLL(&lfc_ctl->wss_estimation);
|
||||
|
||||
/* Recreate file cache on restart */
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (lfc_use_kernel_module)
|
||||
fd = BasicOpenFile(CLOCKCACHE_DEV_PATH, O_RDWR);
|
||||
else
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
|
||||
if (lfc_use_kernel_module)
|
||||
elog(WARNING, "LFC: failed to open " CLOCKCACHE_DEV_PATH ": %m");
|
||||
else
|
||||
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
|
||||
lfc_ctl->limit = 0;
|
||||
}
|
||||
else
|
||||
@@ -613,6 +645,15 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomBoolVariable("neon.use_kernel_module",
|
||||
"Use neon_pagecache kernel module instead of a regular file (EXPERIMENTAL)",
|
||||
NULL,
|
||||
&lfc_use_kernel_module,
|
||||
true,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
@@ -1297,27 +1338,57 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
/* offset of first IOV */
|
||||
first_read_offset += chunk_offs + first_block_in_chunk_read;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
|
||||
/* Read only the blocks we're interested in, limiting */
|
||||
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
|
||||
nwrite, first_read_offset * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * nwrite))
|
||||
if (lfc_use_kernel_module)
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
{
|
||||
if (!BITMAP_ISSET(chunk_mask, i))
|
||||
continue;
|
||||
Assert(iov[i].iov_len == BLCKSZ);
|
||||
rc = pread_with_ioctl(iov[i].iov_base, first_read_offset + i - first_block_in_chunk_read);
|
||||
if (rc < 0 && errno == ENOENT)
|
||||
{
|
||||
/* The kernel module evicted the page */
|
||||
elog(DEBUG1, "kernel module had evicted block");
|
||||
}
|
||||
else if (rc < 0)
|
||||
{
|
||||
pgstat_report_wait_end();
|
||||
lfc_disable("ioctl read");
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* success! */
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
}
|
||||
}
|
||||
pgstat_report_wait_end();
|
||||
}
|
||||
|
||||
/*
|
||||
* We successfully read the pages we know were valid when we
|
||||
* started reading; now mark those pages as read
|
||||
*/
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
else
|
||||
{
|
||||
if (BITMAP_ISSET(chunk_mask, i))
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
|
||||
nwrite, first_read_offset * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * nwrite))
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* We successfully read the pages we know were valid when we
|
||||
* started reading; now mark those pages as read
|
||||
*/
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
{
|
||||
if (BITMAP_ISSET(chunk_mask, i))
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1364,6 +1435,65 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return blocks_read;
|
||||
}
|
||||
|
||||
static int
|
||||
pread_with_ioctl(void *buffer, uint64 blkno)
|
||||
{
|
||||
struct neon_rw_args args = {
|
||||
.key = {
|
||||
.key_hi = 0,
|
||||
.key_lo = blkno
|
||||
},
|
||||
.offset = 0,
|
||||
.length = POSTGRES_PAGE_SIZE,
|
||||
.buffer = (__u64)(uintptr_t) buffer
|
||||
};
|
||||
int rc;
|
||||
|
||||
errno = 0;
|
||||
|
||||
elog(LOG, "calling ioctl read for blk %lu with buffer=%p (shared_buffers is at %p-%p)",
|
||||
blkno,
|
||||
buffer,
|
||||
BufferBlocks,
|
||||
BufferBlocks + BLCKSZ * NBuffers);
|
||||
rc = ioctl(lfc_desc, NEON_IOCTL_READ, &args);
|
||||
if (rc >= 0)
|
||||
lfc_ctl->kernel_module_read_hits++;
|
||||
else if (rc < 0 && errno == ENOENT)
|
||||
lfc_ctl->kernel_module_read_misses++;
|
||||
else
|
||||
elog(LOG, "ioctl read failed for blk %lu with buffer=%p: %m", blkno, buffer);
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int
|
||||
pwrite_with_ioctl(const void *buffer, uint64 blkno)
|
||||
{
|
||||
struct neon_rw_args args = {
|
||||
.key = {
|
||||
.key_hi = 0,
|
||||
.key_lo = blkno
|
||||
},
|
||||
.offset = 0,
|
||||
.length = POSTGRES_PAGE_SIZE,
|
||||
.buffer = (__u64)(uintptr_t) buffer
|
||||
};
|
||||
int rc;
|
||||
|
||||
elog(LOG, "calling ioctl write for blk %lu with buffer=%p (shared_buffers is at %p-%p)",
|
||||
blkno,
|
||||
buffer,
|
||||
BufferBlocks,
|
||||
BufferBlocks + BLCKSZ * NBuffers);
|
||||
|
||||
rc = ioctl(lfc_desc, NEON_IOCTL_WRITE, &args);
|
||||
if (rc >= 0)
|
||||
lfc_ctl->kernel_module_write_hits++;
|
||||
else if (rc < 0 && errno == ENOMEM)
|
||||
lfc_ctl->kernel_module_write_misses++;
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize new LFC hash entry, perform eviction if needed.
|
||||
* Returns false if there are no unpinned entries and chunk can not be added.
|
||||
@@ -1484,7 +1614,6 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
{
|
||||
BufferTag tag;
|
||||
FileCacheEntry *entry;
|
||||
ssize_t rc;
|
||||
bool found;
|
||||
uint32 hash;
|
||||
uint64 generation;
|
||||
@@ -1493,6 +1622,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
ConditionVariable* cv;
|
||||
FileCacheBlockState state;
|
||||
XLogRecPtr lwlsn;
|
||||
bool success;
|
||||
|
||||
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
|
||||
|
||||
@@ -1571,16 +1701,60 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != BLCKSZ)
|
||||
if (lfc_use_kernel_module)
|
||||
{
|
||||
lfc_disable("write");
|
||||
int rc;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwrite_with_ioctl(buffer,
|
||||
entry_offset * lfc_blocks_per_chunk + chunk_offs);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc < 0 && errno == ENOMEM)
|
||||
{
|
||||
/*
|
||||
* Write was wasted.
|
||||
*
|
||||
* FIXME: We could mark the page in the chunk as UNAVAILABLE,
|
||||
* since we know it was not actually present in the kernel
|
||||
* cache. Any subsequent read on it will inevitably fail with
|
||||
* ENOENT. That's not a correctness issue however, assuming that
|
||||
* the call never returns ENOMEM when the old version of the page
|
||||
* is still in the cache.
|
||||
*/
|
||||
success = true;
|
||||
}
|
||||
else if (rc < 0)
|
||||
{
|
||||
success = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* successful write */
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ssize_t rc;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
success = (rc == BLCKSZ);
|
||||
}
|
||||
|
||||
if (!success)
|
||||
{
|
||||
lfc_disable(lfc_use_kernel_module ? "write ioctl" : "write");
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1756,19 +1930,60 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != BLCKSZ * blocks_in_chunk)
|
||||
/* Perform the write */
|
||||
if (lfc_use_kernel_module)
|
||||
{
|
||||
lfc_disable("write");
|
||||
return;
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = pwrite_with_ioctl(
|
||||
iov[i].iov_base,
|
||||
entry_offset * lfc_blocks_per_chunk + chunk_offs
|
||||
);
|
||||
if (rc < 0 && errno == ENOMEM)
|
||||
{
|
||||
/*
|
||||
* Write was wasted.
|
||||
*
|
||||
* FIXME: We could mark the page in the chunk as UNAVAILABLE,
|
||||
* since we know it was not actually present in the kernel
|
||||
* cache. Any subsequent read on it will inevitably fail with
|
||||
* ENOENT. That's not a correctness issue however, assuming that
|
||||
* the call never returns ENOMEM when the old version of the page
|
||||
* is still in the cache.
|
||||
*/
|
||||
}
|
||||
else if (rc < 0)
|
||||
{
|
||||
/* other error, not expected */
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
lfc_disable("write ioctl");
|
||||
return;
|
||||
}
|
||||
}
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
}
|
||||
else
|
||||
{
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
if (rc != BLCKSZ * blocks_in_chunk)
|
||||
{
|
||||
lfc_disable("write");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* success */
|
||||
{
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
@@ -1922,6 +2137,26 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->pinned;
|
||||
break;
|
||||
case 10:
|
||||
key = "file_cache_kernel_module_read_hits";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_read_hits;
|
||||
break;
|
||||
case 11:
|
||||
key = "file_cache_kernel_module_read_misses";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_read_misses;
|
||||
break;
|
||||
case 12:
|
||||
key = "file_cache_kernel_module_write_hits";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_write_hits;
|
||||
break;
|
||||
case 13:
|
||||
key = "file_cache_kernel_module_write_misses";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_write_misses;
|
||||
break;
|
||||
default:
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
35
pgxn/neon/neon_pagecache.h
Normal file
35
pgxn/neon/neon_pagecache.h
Normal file
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* This is for the special ioctl in the neon_pagecache kernel module.
|
||||
*
|
||||
* DO NOT MODIFY! This header must agree with what the kernel module was
|
||||
* compiled with!
|
||||
*/
|
||||
|
||||
#ifndef NEON_PAGECACHE_H
|
||||
#define NEON_PAGECACHE_H
|
||||
|
||||
#include <linux/types.h>
|
||||
|
||||
|
||||
#define POSTGRES_PAGE_SIZE 8192 // 8 KiB
|
||||
|
||||
struct neon_key {
|
||||
__u64 key_hi; // Upper 64 bits of 128-bit key
|
||||
__u64 key_lo; // Lower 64 bits of 128-bit key
|
||||
};
|
||||
|
||||
struct neon_rw_args {
|
||||
struct neon_key key;
|
||||
__u32 offset; // Offset within page (0-8191)
|
||||
__u32 length; // Length to read/write
|
||||
__u64 buffer; // User buffer address
|
||||
};
|
||||
|
||||
#define NEON_IOC_MAGIC 'N'
|
||||
|
||||
#define NEON_IOCTL_READ _IOWR(NEON_IOC_MAGIC, 1, struct neon_rw_args)
|
||||
#define NEON_IOCTL_WRITE _IOWR(NEON_IOC_MAGIC, 2, struct neon_rw_args)
|
||||
|
||||
|
||||
|
||||
#endif /* NEON_PAGECACHE_H */
|
||||
@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
|
||||
{"name": "role$"},
|
||||
{"name": "role$$"},
|
||||
{"name": "role$x$"},
|
||||
{"name": "x"},
|
||||
{"name": "xx"},
|
||||
{"name": "$x"},
|
||||
{"name": "x$"},
|
||||
{"name": "$x$"},
|
||||
{"name": "xx$"},
|
||||
{"name": "$xx"},
|
||||
{"name": "$xx$"},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{"name": "x" * 63},
|
||||
]
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
|
||||
"name": "db name$x$",
|
||||
"owner": "role$x$",
|
||||
},
|
||||
{
|
||||
"name": "x",
|
||||
"owner": "x",
|
||||
},
|
||||
{
|
||||
"name": "xx",
|
||||
"owner": "xx",
|
||||
},
|
||||
{
|
||||
"name": "$x",
|
||||
"owner": "$x",
|
||||
},
|
||||
{
|
||||
"name": "x$",
|
||||
"owner": "x$",
|
||||
},
|
||||
{
|
||||
"name": "$x$",
|
||||
"owner": "$x$",
|
||||
},
|
||||
{
|
||||
"name": "xx$",
|
||||
"owner": "xx$",
|
||||
},
|
||||
{
|
||||
"name": "$xx",
|
||||
"owner": "$xx",
|
||||
},
|
||||
{
|
||||
"name": "$xx$",
|
||||
"owner": "$xx$",
|
||||
},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{
|
||||
"name": "x" * 63,
|
||||
"owner": "x" * 63,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can create and work with databases and roles
|
||||
with special characters (whitespaces, %, tabs, etc.) in the name.
|
||||
Also use `drop_subscriptions_before_start: true`. We do not actually
|
||||
have any subscriptions in this test, so it should be no-op, but it
|
||||
i) simulates the case when we create a second dev branch together with
|
||||
a new project creation, and ii) just generally stresses more code paths.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
|
||||
Reference in New Issue
Block a user