mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-19 08:10:37 +00:00
Compare commits
2 Commits
proxy-prot
...
cannot-ret
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ca461ce2fd | ||
|
|
b497d0094e |
@@ -14,7 +14,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <sys/file.h>
|
#include <sys/file.h>
|
||||||
#include <sys/statvfs.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
|
||||||
@@ -38,9 +37,6 @@
|
|||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "storage/pg_shmem.h"
|
#include "storage/pg_shmem.h"
|
||||||
#include "storage/buf_internals.h"
|
#include "storage/buf_internals.h"
|
||||||
#include "storage/procsignal.h"
|
|
||||||
#include "postmaster/bgworker.h"
|
|
||||||
#include "postmaster/interrupt.h"
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local file cache is used to temporary store relations pages in local file system.
|
* Local file cache is used to temporary store relations pages in local file system.
|
||||||
@@ -66,9 +62,6 @@
|
|||||||
|
|
||||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||||
|
|
||||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
|
||||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
|
||||||
|
|
||||||
typedef struct FileCacheEntry
|
typedef struct FileCacheEntry
|
||||||
{
|
{
|
||||||
BufferTag key;
|
BufferTag key;
|
||||||
@@ -91,14 +84,12 @@ static int lfc_desc = 0;
|
|||||||
static LWLockId lfc_lock;
|
static LWLockId lfc_lock;
|
||||||
static int lfc_max_size;
|
static int lfc_max_size;
|
||||||
static int lfc_size_limit;
|
static int lfc_size_limit;
|
||||||
static int lfc_free_space_watermark;
|
|
||||||
static char* lfc_path;
|
static char* lfc_path;
|
||||||
static FileCacheControl* lfc_ctl;
|
static FileCacheControl* lfc_ctl;
|
||||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||||
#if PG_VERSION_NUM>=150000
|
#if PG_VERSION_NUM>=150000
|
||||||
static shmem_request_hook_type prev_shmem_request_hook;
|
static shmem_request_hook_type prev_shmem_request_hook;
|
||||||
#endif
|
#endif
|
||||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
|
||||||
|
|
||||||
void FileCacheMonitorMain(Datum main_arg);
|
void FileCacheMonitorMain(Datum main_arg);
|
||||||
|
|
||||||
@@ -254,80 +245,6 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Local file system state monitor check available free space.
|
|
||||||
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
|
|
||||||
* but throwing away least recently accessed chunks.
|
|
||||||
* First time low space watermark is reached cache size is divided by two,
|
|
||||||
* second time by four,... Finally we remove all chunks from local cache.
|
|
||||||
*
|
|
||||||
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
|
|
||||||
* We only throw away cached chunks but do not prevent from filling cache by new chunks.
|
|
||||||
*
|
|
||||||
* Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark
|
|
||||||
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
|
|
||||||
* Calling statvfs each second should not add any noticeable overhead.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
FileCacheMonitorMain(Datum main_arg)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Choose file system state monitor interval so that space can not be exosted
|
|
||||||
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
|
|
||||||
*/
|
|
||||||
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
|
|
||||||
|
|
||||||
/* Establish signal handlers. */
|
|
||||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
|
||||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
|
||||||
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
|
|
||||||
BackgroundWorkerUnblockSignals();
|
|
||||||
|
|
||||||
/* Periodically dump buffers until terminated. */
|
|
||||||
while (!ShutdownRequestPending)
|
|
||||||
{
|
|
||||||
if (lfc_size_limit != 0)
|
|
||||||
{
|
|
||||||
struct statvfs sfs;
|
|
||||||
if (statvfs(lfc_path, &sfs) < 0)
|
|
||||||
{
|
|
||||||
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
|
|
||||||
{
|
|
||||||
if (lfc_shrinking_factor < 31) {
|
|
||||||
lfc_shrinking_factor += 1;
|
|
||||||
}
|
|
||||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pg_usleep(monitor_interval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
lfc_register_free_space_monitor(void)
|
|
||||||
{
|
|
||||||
BackgroundWorker bgw;
|
|
||||||
memset(&bgw, 0, sizeof(bgw));
|
|
||||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
|
||||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
|
||||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
|
||||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain");
|
|
||||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor");
|
|
||||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor");
|
|
||||||
bgw.bgw_restart_time = 5;
|
|
||||||
bgw.bgw_notify_pid = 0;
|
|
||||||
bgw.bgw_main_arg = (Datum) 0;
|
|
||||||
|
|
||||||
RegisterBackgroundWorker(&bgw);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
lfc_init(void)
|
lfc_init(void)
|
||||||
{
|
{
|
||||||
@@ -364,19 +281,6 @@ lfc_init(void)
|
|||||||
lfc_change_limit_hook,
|
lfc_change_limit_hook,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.free_space_watermark",
|
|
||||||
"Minimal free space in local file system after reaching which local file cache will be truncated",
|
|
||||||
NULL,
|
|
||||||
&lfc_free_space_watermark,
|
|
||||||
1024, /* 1GB */
|
|
||||||
0,
|
|
||||||
INT_MAX,
|
|
||||||
PGC_SIGHUP,
|
|
||||||
GUC_UNIT_MB,
|
|
||||||
NULL,
|
|
||||||
NULL,
|
|
||||||
NULL);
|
|
||||||
|
|
||||||
DefineCustomStringVariable("neon.file_cache_path",
|
DefineCustomStringVariable("neon.file_cache_path",
|
||||||
"Path to local file cache (can be raw device)",
|
"Path to local file cache (can be raw device)",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -391,9 +295,6 @@ lfc_init(void)
|
|||||||
if (lfc_max_size == 0)
|
if (lfc_max_size == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (lfc_free_space_watermark != 0)
|
|
||||||
lfc_register_free_space_monitor();
|
|
||||||
|
|
||||||
prev_shmem_startup_hook = shmem_startup_hook;
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
shmem_startup_hook = lfc_shmem_startup;
|
shmem_startup_hook = lfc_shmem_startup;
|
||||||
#if PG_VERSION_NUM>=150000
|
#if PG_VERSION_NUM>=150000
|
||||||
|
|||||||
@@ -95,6 +95,20 @@ pub mod errors {
|
|||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
// retry some transport errors
|
||||||
|
Self::Transport(io) => io.cannot_retry(),
|
||||||
|
// locked can be returned when the endpoint was in transition
|
||||||
|
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||||
|
Self::Console {
|
||||||
|
status: http::StatusCode::LOCKED,
|
||||||
|
ref text,
|
||||||
|
} => text.contains("quota"),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<reqwest::Error> for ApiError {
|
impl From<reqwest::Error> for ApiError {
|
||||||
|
|||||||
@@ -414,6 +414,9 @@ where
|
|||||||
Ok(res) => return Ok(res),
|
Ok(res) => return Ok(res),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(error = ?e, "could not connect to compute node");
|
error!(error = ?e, "could not connect to compute node");
|
||||||
|
if e.cannot_retry() {
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
(invalidate_cache(node_info), e)
|
(invalidate_cache(node_info), e)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -501,6 +504,8 @@ pub fn handle_try_wake(
|
|||||||
|
|
||||||
pub trait ShouldRetry {
|
pub trait ShouldRetry {
|
||||||
fn could_retry(&self) -> bool;
|
fn could_retry(&self) -> bool;
|
||||||
|
/// return true if retrying definitely won't make a difference - or is harmful
|
||||||
|
fn cannot_retry(&self) -> bool;
|
||||||
fn should_retry(&self, num_retries: u32) -> bool {
|
fn should_retry(&self, num_retries: u32) -> bool {
|
||||||
match self {
|
match self {
|
||||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||||
@@ -517,6 +522,9 @@ impl ShouldRetry for io::Error {
|
|||||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldRetry for tokio_postgres::error::DbError {
|
impl ShouldRetry for tokio_postgres::error::DbError {
|
||||||
@@ -530,6 +538,22 @@ impl ShouldRetry for tokio_postgres::error::DbError {
|
|||||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
use tokio_postgres::error::SqlState;
|
||||||
|
match self.code() {
|
||||||
|
&SqlState::TOO_MANY_CONNECTIONS
|
||||||
|
| &SqlState::INVALID_CATALOG_NAME
|
||||||
|
| &SqlState::INVALID_PASSWORD => true,
|
||||||
|
// pgbouncer errors?
|
||||||
|
&SqlState::PROTOCOL_VIOLATION => matches!(
|
||||||
|
self.message(),
|
||||||
|
"no more connections allowed (max_client_conn)"
|
||||||
|
| "server login has been failing, try again later (server_login_retry)"
|
||||||
|
| "query_wait_timeout"
|
||||||
|
),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldRetry for tokio_postgres::Error {
|
impl ShouldRetry for tokio_postgres::Error {
|
||||||
@@ -542,6 +566,13 @@ impl ShouldRetry for tokio_postgres::Error {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||||
|
tokio_postgres::error::DbError::cannot_retry(db_err)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldRetry for compute::ConnectionError {
|
impl ShouldRetry for compute::ConnectionError {
|
||||||
@@ -552,6 +583,13 @@ impl ShouldRetry for compute::ConnectionError {
|
|||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
compute::ConnectionError::Postgres(err) => err.cannot_retry(),
|
||||||
|
compute::ConnectionError::CouldNotConnect(err) => err.cannot_retry(),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||||
|
|||||||
@@ -365,6 +365,9 @@ impl ShouldRetry for TestConnectError {
|
|||||||
fn could_retry(&self) -> bool {
|
fn could_retry(&self) -> bool {
|
||||||
self.retryable
|
self.retryable
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
Reference in New Issue
Block a user