mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-09 21:50:37 +00:00
Compare commits
2 Commits
proxy-prot
...
cannot-ret
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ca461ce2fd | ||
|
|
b497d0094e |
@@ -14,7 +14,6 @@
|
||||
*/
|
||||
|
||||
#include <sys/file.h>
|
||||
#include <sys/statvfs.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
@@ -38,9 +37,6 @@
|
||||
#include "storage/fd.h"
|
||||
#include "storage/pg_shmem.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/procsignal.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/interrupt.h"
|
||||
|
||||
/*
|
||||
* Local file cache is used to temporary store relations pages in local file system.
|
||||
@@ -66,9 +62,6 @@
|
||||
|
||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||
|
||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
||||
|
||||
typedef struct FileCacheEntry
|
||||
{
|
||||
BufferTag key;
|
||||
@@ -91,14 +84,12 @@ static int lfc_desc = 0;
|
||||
static LWLockId lfc_lock;
|
||||
static int lfc_max_size;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_free_space_watermark;
|
||||
static char* lfc_path;
|
||||
static FileCacheControl* lfc_ctl;
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
static shmem_request_hook_type prev_shmem_request_hook;
|
||||
#endif
|
||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
||||
|
||||
void FileCacheMonitorMain(Datum main_arg);
|
||||
|
||||
@@ -254,80 +245,6 @@ lfc_change_limit_hook(int newval, void *extra)
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Local file system state monitor check available free space.
|
||||
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
|
||||
* but throwing away least recently accessed chunks.
|
||||
* First time low space watermark is reached cache size is divided by two,
|
||||
* second time by four,... Finally we remove all chunks from local cache.
|
||||
*
|
||||
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
|
||||
* We only throw away cached chunks but do not prevent from filling cache by new chunks.
|
||||
*
|
||||
* Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark
|
||||
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
|
||||
* Calling statvfs each second should not add any noticeable overhead.
|
||||
*/
|
||||
void
|
||||
FileCacheMonitorMain(Datum main_arg)
|
||||
{
|
||||
/*
|
||||
* Choose file system state monitor interval so that space can not be exosted
|
||||
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
|
||||
*/
|
||||
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
|
||||
|
||||
/* Establish signal handlers. */
|
||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
||||
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
/* Periodically dump buffers until terminated. */
|
||||
while (!ShutdownRequestPending)
|
||||
{
|
||||
if (lfc_size_limit != 0)
|
||||
{
|
||||
struct statvfs sfs;
|
||||
if (statvfs(lfc_path, &sfs) < 0)
|
||||
{
|
||||
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
|
||||
{
|
||||
if (lfc_shrinking_factor < 31) {
|
||||
lfc_shrinking_factor += 1;
|
||||
}
|
||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
||||
}
|
||||
else
|
||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
||||
}
|
||||
}
|
||||
pg_usleep(monitor_interval);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
lfc_register_free_space_monitor(void)
|
||||
{
|
||||
BackgroundWorker bgw;
|
||||
memset(&bgw, 0, sizeof(bgw));
|
||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain");
|
||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor");
|
||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor");
|
||||
bgw.bgw_restart_time = 5;
|
||||
bgw.bgw_notify_pid = 0;
|
||||
bgw.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&bgw);
|
||||
}
|
||||
|
||||
void
|
||||
lfc_init(void)
|
||||
{
|
||||
@@ -364,19 +281,6 @@ lfc_init(void)
|
||||
lfc_change_limit_hook,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("neon.free_space_watermark",
|
||||
"Minimal free space in local file system after reaching which local file cache will be truncated",
|
||||
NULL,
|
||||
&lfc_free_space_watermark,
|
||||
1024, /* 1GB */
|
||||
0,
|
||||
INT_MAX,
|
||||
PGC_SIGHUP,
|
||||
GUC_UNIT_MB,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomStringVariable("neon.file_cache_path",
|
||||
"Path to local file cache (can be raw device)",
|
||||
NULL,
|
||||
@@ -391,9 +295,6 @@ lfc_init(void)
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
if (lfc_free_space_watermark != 0)
|
||||
lfc_register_free_space_monitor();
|
||||
|
||||
prev_shmem_startup_hook = shmem_startup_hook;
|
||||
shmem_startup_hook = lfc_shmem_startup;
|
||||
#if PG_VERSION_NUM>=150000
|
||||
|
||||
@@ -95,6 +95,20 @@ pub mod errors {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn cannot_retry(&self) -> bool {
|
||||
match self {
|
||||
// retry some transport errors
|
||||
Self::Transport(io) => io.cannot_retry(),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => text.contains("quota"),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<reqwest::Error> for ApiError {
|
||||
|
||||
@@ -129,18 +129,12 @@ impl<T: AsyncRead> WithClientIp<T> {
|
||||
// exit for bad header
|
||||
let len = usize::min(self.buf.len(), HEADER.len());
|
||||
if self.buf[..len] != HEADER[..len] {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"invalid proxy protocol v2 header",
|
||||
)));
|
||||
return Poll::Ready(Ok(None));
|
||||
}
|
||||
|
||||
// if no more bytes available then exit
|
||||
if ready!(bytes_read) == 0 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"missing proxy protocol v2 header",
|
||||
)));
|
||||
return Poll::Ready(Ok(None));
|
||||
};
|
||||
}
|
||||
|
||||
@@ -152,27 +146,27 @@ impl<T: AsyncRead> WithClientIp<T> {
|
||||
let command = vc & 0b1111;
|
||||
if version != 2 {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol version. expected version 2",
|
||||
)));
|
||||
}
|
||||
let local = match command {
|
||||
match command {
|
||||
// the connection was established on purpose by the proxy
|
||||
// without being relayed. The connection endpoints are the sender and the
|
||||
// receiver. Such connections exist when the proxy sends health-checks to the
|
||||
// server. The receiver must accept this connection as valid and must use the
|
||||
// real connection endpoints and discard the protocol block including the
|
||||
// family which is ignored.
|
||||
0 => true,
|
||||
0 => {}
|
||||
// the connection was established on behalf of another node,
|
||||
// and reflects the original connection endpoints. The receiver must then use
|
||||
// the information provided in the protocol block to get original the address.
|
||||
1 => false,
|
||||
1 => {}
|
||||
// other values are unassigned and must not be emitted by senders. Receivers
|
||||
// must drop connections presenting unexpected values here.
|
||||
_ => {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol command. expected local (0) or proxy (1)",
|
||||
)))
|
||||
}
|
||||
@@ -192,29 +186,8 @@ impl<T: AsyncRead> WithClientIp<T> {
|
||||
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
|
||||
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
|
||||
0x21 | 0x22 => 36,
|
||||
|
||||
// - \x31 : UNIX stream : the forwarded connection uses SOCK_STREAM over the
|
||||
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
|
||||
// - \x32 : UNIX datagram : the forwarded connection uses SOCK_DGRAM over the
|
||||
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
|
||||
0x31 | 0x32 => 216,
|
||||
|
||||
// UNSPEC : the connection is forwarded for an unknown, unspecified
|
||||
// or unsupported protocol. The sender should use this family when sending
|
||||
// LOCAL commands or when dealing with unsupported protocol families. When
|
||||
// used with a LOCAL command, the receiver must accept the connection and
|
||||
// ignore any address information. For other commands, the receiver is free
|
||||
// to accept the connection anyway and use the real endpoints addresses or to
|
||||
// reject the connection. The receiver should ignore address information.
|
||||
0x00 | 0x01 | 0x02 | 0x10 | 0x20 | 0x30 if local => 0,
|
||||
|
||||
// unspecified or invalid. ignore the addresses
|
||||
_ => {
|
||||
return Poll::Ready(Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"invalid proxy protocol address family/transport protocol",
|
||||
)))
|
||||
}
|
||||
// unspecified or unix stream. ignore the addresses
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
// The 15th and 16th bytes is the address length in bytes in network endian order.
|
||||
@@ -446,7 +419,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "invalid proxy protocol v2 header")]
|
||||
async fn test_invalid() {
|
||||
let data = [0x55; 256];
|
||||
|
||||
@@ -454,15 +426,20 @@ mod tests {
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(read.state, ProxyParse::None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "missing proxy protocol v2 header")]
|
||||
async fn test_short() {
|
||||
let mut read = pin!(WithClientIp::new(&super::HEADER.as_slice()[..10]));
|
||||
let data = [0x55; 10];
|
||||
|
||||
let mut read = pin!(WithClientIp::new(data.as_slice()));
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(read.state, ProxyParse::None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -414,6 +414,9 @@ where
|
||||
Ok(res) => return Ok(res),
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
if e.cannot_retry() {
|
||||
return Err(e.into());
|
||||
}
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
};
|
||||
@@ -501,6 +504,8 @@ pub fn handle_try_wake(
|
||||
|
||||
pub trait ShouldRetry {
|
||||
fn could_retry(&self) -> bool;
|
||||
/// return true if retrying definitely won't make a difference - or is harmful
|
||||
fn cannot_retry(&self) -> bool;
|
||||
fn should_retry(&self, num_retries: u32) -> bool {
|
||||
match self {
|
||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||
@@ -517,6 +522,9 @@ impl ShouldRetry for io::Error {
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
||||
)
|
||||
}
|
||||
fn cannot_retry(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::error::DbError {
|
||||
@@ -530,6 +538,22 @@ impl ShouldRetry for tokio_postgres::error::DbError {
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
)
|
||||
}
|
||||
fn cannot_retry(&self) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
match self.code() {
|
||||
&SqlState::TOO_MANY_CONNECTIONS
|
||||
| &SqlState::INVALID_CATALOG_NAME
|
||||
| &SqlState::INVALID_PASSWORD => true,
|
||||
// pgbouncer errors?
|
||||
&SqlState::PROTOCOL_VIOLATION => matches!(
|
||||
self.message(),
|
||||
"no more connections allowed (max_client_conn)"
|
||||
| "server login has been failing, try again later (server_login_retry)"
|
||||
| "query_wait_timeout"
|
||||
),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::Error {
|
||||
@@ -542,6 +566,13 @@ impl ShouldRetry for tokio_postgres::Error {
|
||||
false
|
||||
}
|
||||
}
|
||||
fn cannot_retry(&self) -> bool {
|
||||
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
tokio_postgres::error::DbError::cannot_retry(db_err)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for compute::ConnectionError {
|
||||
@@ -552,6 +583,13 @@ impl ShouldRetry for compute::ConnectionError {
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
fn cannot_retry(&self) -> bool {
|
||||
match self {
|
||||
compute::ConnectionError::Postgres(err) => err.cannot_retry(),
|
||||
compute::ConnectionError::CouldNotConnect(err) => err.cannot_retry(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
|
||||
@@ -365,6 +365,9 @@ impl ShouldRetry for TestConnectError {
|
||||
fn could_retry(&self) -> bool {
|
||||
self.retryable
|
||||
}
|
||||
fn cannot_retry(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
Reference in New Issue
Block a user