mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-19 10:30:37 +00:00
Compare commits
2 Commits
proxy-prot
...
cannot-ret
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ca461ce2fd | ||
|
|
b497d0094e |
@@ -14,7 +14,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <sys/file.h>
|
#include <sys/file.h>
|
||||||
#include <sys/statvfs.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
|
|
||||||
@@ -38,9 +37,6 @@
|
|||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "storage/pg_shmem.h"
|
#include "storage/pg_shmem.h"
|
||||||
#include "storage/buf_internals.h"
|
#include "storage/buf_internals.h"
|
||||||
#include "storage/procsignal.h"
|
|
||||||
#include "postmaster/bgworker.h"
|
|
||||||
#include "postmaster/interrupt.h"
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local file cache is used to temporary store relations pages in local file system.
|
* Local file cache is used to temporary store relations pages in local file system.
|
||||||
@@ -66,9 +62,6 @@
|
|||||||
|
|
||||||
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
|
||||||
|
|
||||||
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
|
|
||||||
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
|
|
||||||
|
|
||||||
typedef struct FileCacheEntry
|
typedef struct FileCacheEntry
|
||||||
{
|
{
|
||||||
BufferTag key;
|
BufferTag key;
|
||||||
@@ -91,14 +84,12 @@ static int lfc_desc = 0;
|
|||||||
static LWLockId lfc_lock;
|
static LWLockId lfc_lock;
|
||||||
static int lfc_max_size;
|
static int lfc_max_size;
|
||||||
static int lfc_size_limit;
|
static int lfc_size_limit;
|
||||||
static int lfc_free_space_watermark;
|
|
||||||
static char* lfc_path;
|
static char* lfc_path;
|
||||||
static FileCacheControl* lfc_ctl;
|
static FileCacheControl* lfc_ctl;
|
||||||
static shmem_startup_hook_type prev_shmem_startup_hook;
|
static shmem_startup_hook_type prev_shmem_startup_hook;
|
||||||
#if PG_VERSION_NUM>=150000
|
#if PG_VERSION_NUM>=150000
|
||||||
static shmem_request_hook_type prev_shmem_request_hook;
|
static shmem_request_hook_type prev_shmem_request_hook;
|
||||||
#endif
|
#endif
|
||||||
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
|
|
||||||
|
|
||||||
void FileCacheMonitorMain(Datum main_arg);
|
void FileCacheMonitorMain(Datum main_arg);
|
||||||
|
|
||||||
@@ -254,80 +245,6 @@ lfc_change_limit_hook(int newval, void *extra)
|
|||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Local file system state monitor check available free space.
|
|
||||||
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
|
|
||||||
* but throwing away least recently accessed chunks.
|
|
||||||
* First time low space watermark is reached cache size is divided by two,
|
|
||||||
* second time by four,... Finally we remove all chunks from local cache.
|
|
||||||
*
|
|
||||||
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
|
|
||||||
* We only throw away cached chunks but do not prevent from filling cache by new chunks.
|
|
||||||
*
|
|
||||||
* Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark
|
|
||||||
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
|
|
||||||
* Calling statvfs each second should not add any noticeable overhead.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
FileCacheMonitorMain(Datum main_arg)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Choose file system state monitor interval so that space can not be exosted
|
|
||||||
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
|
|
||||||
*/
|
|
||||||
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
|
|
||||||
|
|
||||||
/* Establish signal handlers. */
|
|
||||||
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
|
|
||||||
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
|
||||||
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
|
|
||||||
BackgroundWorkerUnblockSignals();
|
|
||||||
|
|
||||||
/* Periodically dump buffers until terminated. */
|
|
||||||
while (!ShutdownRequestPending)
|
|
||||||
{
|
|
||||||
if (lfc_size_limit != 0)
|
|
||||||
{
|
|
||||||
struct statvfs sfs;
|
|
||||||
if (statvfs(lfc_path, &sfs) < 0)
|
|
||||||
{
|
|
||||||
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
|
|
||||||
{
|
|
||||||
if (lfc_shrinking_factor < 31) {
|
|
||||||
lfc_shrinking_factor += 1;
|
|
||||||
}
|
|
||||||
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
lfc_shrinking_factor = 0; /* reset to initial value */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pg_usleep(monitor_interval);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
lfc_register_free_space_monitor(void)
|
|
||||||
{
|
|
||||||
BackgroundWorker bgw;
|
|
||||||
memset(&bgw, 0, sizeof(bgw));
|
|
||||||
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
|
|
||||||
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
|
||||||
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
|
|
||||||
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain");
|
|
||||||
snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor");
|
|
||||||
snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor");
|
|
||||||
bgw.bgw_restart_time = 5;
|
|
||||||
bgw.bgw_notify_pid = 0;
|
|
||||||
bgw.bgw_main_arg = (Datum) 0;
|
|
||||||
|
|
||||||
RegisterBackgroundWorker(&bgw);
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
lfc_init(void)
|
lfc_init(void)
|
||||||
{
|
{
|
||||||
@@ -364,19 +281,6 @@ lfc_init(void)
|
|||||||
lfc_change_limit_hook,
|
lfc_change_limit_hook,
|
||||||
NULL);
|
NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable("neon.free_space_watermark",
|
|
||||||
"Minimal free space in local file system after reaching which local file cache will be truncated",
|
|
||||||
NULL,
|
|
||||||
&lfc_free_space_watermark,
|
|
||||||
1024, /* 1GB */
|
|
||||||
0,
|
|
||||||
INT_MAX,
|
|
||||||
PGC_SIGHUP,
|
|
||||||
GUC_UNIT_MB,
|
|
||||||
NULL,
|
|
||||||
NULL,
|
|
||||||
NULL);
|
|
||||||
|
|
||||||
DefineCustomStringVariable("neon.file_cache_path",
|
DefineCustomStringVariable("neon.file_cache_path",
|
||||||
"Path to local file cache (can be raw device)",
|
"Path to local file cache (can be raw device)",
|
||||||
NULL,
|
NULL,
|
||||||
@@ -391,9 +295,6 @@ lfc_init(void)
|
|||||||
if (lfc_max_size == 0)
|
if (lfc_max_size == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
if (lfc_free_space_watermark != 0)
|
|
||||||
lfc_register_free_space_monitor();
|
|
||||||
|
|
||||||
prev_shmem_startup_hook = shmem_startup_hook;
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
shmem_startup_hook = lfc_shmem_startup;
|
shmem_startup_hook = lfc_shmem_startup;
|
||||||
#if PG_VERSION_NUM>=150000
|
#if PG_VERSION_NUM>=150000
|
||||||
|
|||||||
@@ -95,6 +95,20 @@ pub mod errors {
|
|||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
// retry some transport errors
|
||||||
|
Self::Transport(io) => io.cannot_retry(),
|
||||||
|
// locked can be returned when the endpoint was in transition
|
||||||
|
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||||
|
Self::Console {
|
||||||
|
status: http::StatusCode::LOCKED,
|
||||||
|
ref text,
|
||||||
|
} => text.contains("quota"),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<reqwest::Error> for ApiError {
|
impl From<reqwest::Error> for ApiError {
|
||||||
|
|||||||
@@ -129,18 +129,12 @@ impl<T: AsyncRead> WithClientIp<T> {
|
|||||||
// exit for bad header
|
// exit for bad header
|
||||||
let len = usize::min(self.buf.len(), HEADER.len());
|
let len = usize::min(self.buf.len(), HEADER.len());
|
||||||
if self.buf[..len] != HEADER[..len] {
|
if self.buf[..len] != HEADER[..len] {
|
||||||
return Poll::Ready(Err(io::Error::new(
|
return Poll::Ready(Ok(None));
|
||||||
io::ErrorKind::InvalidInput,
|
|
||||||
"invalid proxy protocol v2 header",
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if no more bytes available then exit
|
// if no more bytes available then exit
|
||||||
if ready!(bytes_read) == 0 {
|
if ready!(bytes_read) == 0 {
|
||||||
return Poll::Ready(Err(io::Error::new(
|
return Poll::Ready(Ok(None));
|
||||||
io::ErrorKind::UnexpectedEof,
|
|
||||||
"missing proxy protocol v2 header",
|
|
||||||
)));
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,27 +146,27 @@ impl<T: AsyncRead> WithClientIp<T> {
|
|||||||
let command = vc & 0b1111;
|
let command = vc & 0b1111;
|
||||||
if version != 2 {
|
if version != 2 {
|
||||||
return Poll::Ready(Err(io::Error::new(
|
return Poll::Ready(Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidInput,
|
io::ErrorKind::Other,
|
||||||
"invalid proxy protocol version. expected version 2",
|
"invalid proxy protocol version. expected version 2",
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
let local = match command {
|
match command {
|
||||||
// the connection was established on purpose by the proxy
|
// the connection was established on purpose by the proxy
|
||||||
// without being relayed. The connection endpoints are the sender and the
|
// without being relayed. The connection endpoints are the sender and the
|
||||||
// receiver. Such connections exist when the proxy sends health-checks to the
|
// receiver. Such connections exist when the proxy sends health-checks to the
|
||||||
// server. The receiver must accept this connection as valid and must use the
|
// server. The receiver must accept this connection as valid and must use the
|
||||||
// real connection endpoints and discard the protocol block including the
|
// real connection endpoints and discard the protocol block including the
|
||||||
// family which is ignored.
|
// family which is ignored.
|
||||||
0 => true,
|
0 => {}
|
||||||
// the connection was established on behalf of another node,
|
// the connection was established on behalf of another node,
|
||||||
// and reflects the original connection endpoints. The receiver must then use
|
// and reflects the original connection endpoints. The receiver must then use
|
||||||
// the information provided in the protocol block to get original the address.
|
// the information provided in the protocol block to get original the address.
|
||||||
1 => false,
|
1 => {}
|
||||||
// other values are unassigned and must not be emitted by senders. Receivers
|
// other values are unassigned and must not be emitted by senders. Receivers
|
||||||
// must drop connections presenting unexpected values here.
|
// must drop connections presenting unexpected values here.
|
||||||
_ => {
|
_ => {
|
||||||
return Poll::Ready(Err(io::Error::new(
|
return Poll::Ready(Err(io::Error::new(
|
||||||
io::ErrorKind::InvalidInput,
|
io::ErrorKind::Other,
|
||||||
"invalid proxy protocol command. expected local (0) or proxy (1)",
|
"invalid proxy protocol command. expected local (0) or proxy (1)",
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
@@ -192,29 +186,8 @@ impl<T: AsyncRead> WithClientIp<T> {
|
|||||||
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
|
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
|
||||||
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
|
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
|
||||||
0x21 | 0x22 => 36,
|
0x21 | 0x22 => 36,
|
||||||
|
// unspecified or unix stream. ignore the addresses
|
||||||
// - \x31 : UNIX stream : the forwarded connection uses SOCK_STREAM over the
|
_ => 0,
|
||||||
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
|
|
||||||
// - \x32 : UNIX datagram : the forwarded connection uses SOCK_DGRAM over the
|
|
||||||
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
|
|
||||||
0x31 | 0x32 => 216,
|
|
||||||
|
|
||||||
// UNSPEC : the connection is forwarded for an unknown, unspecified
|
|
||||||
// or unsupported protocol. The sender should use this family when sending
|
|
||||||
// LOCAL commands or when dealing with unsupported protocol families. When
|
|
||||||
// used with a LOCAL command, the receiver must accept the connection and
|
|
||||||
// ignore any address information. For other commands, the receiver is free
|
|
||||||
// to accept the connection anyway and use the real endpoints addresses or to
|
|
||||||
// reject the connection. The receiver should ignore address information.
|
|
||||||
0x00 | 0x01 | 0x02 | 0x10 | 0x20 | 0x30 if local => 0,
|
|
||||||
|
|
||||||
// unspecified or invalid. ignore the addresses
|
|
||||||
_ => {
|
|
||||||
return Poll::Ready(Err(io::Error::new(
|
|
||||||
io::ErrorKind::InvalidInput,
|
|
||||||
"invalid proxy protocol address family/transport protocol",
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// The 15th and 16th bytes is the address length in bytes in network endian order.
|
// The 15th and 16th bytes is the address length in bytes in network endian order.
|
||||||
@@ -446,7 +419,6 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[should_panic(expected = "invalid proxy protocol v2 header")]
|
|
||||||
async fn test_invalid() {
|
async fn test_invalid() {
|
||||||
let data = [0x55; 256];
|
let data = [0x55; 256];
|
||||||
|
|
||||||
@@ -454,15 +426,20 @@ mod tests {
|
|||||||
|
|
||||||
let mut bytes = vec![];
|
let mut bytes = vec![];
|
||||||
read.read_to_end(&mut bytes).await.unwrap();
|
read.read_to_end(&mut bytes).await.unwrap();
|
||||||
|
assert_eq!(bytes, data);
|
||||||
|
assert_eq!(read.state, ProxyParse::None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
#[should_panic(expected = "missing proxy protocol v2 header")]
|
|
||||||
async fn test_short() {
|
async fn test_short() {
|
||||||
let mut read = pin!(WithClientIp::new(&super::HEADER.as_slice()[..10]));
|
let data = [0x55; 10];
|
||||||
|
|
||||||
|
let mut read = pin!(WithClientIp::new(data.as_slice()));
|
||||||
|
|
||||||
let mut bytes = vec![];
|
let mut bytes = vec![];
|
||||||
read.read_to_end(&mut bytes).await.unwrap();
|
read.read_to_end(&mut bytes).await.unwrap();
|
||||||
|
assert_eq!(bytes, data);
|
||||||
|
assert_eq!(read.state, ProxyParse::None);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
|||||||
@@ -414,6 +414,9 @@ where
|
|||||||
Ok(res) => return Ok(res),
|
Ok(res) => return Ok(res),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(error = ?e, "could not connect to compute node");
|
error!(error = ?e, "could not connect to compute node");
|
||||||
|
if e.cannot_retry() {
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
(invalidate_cache(node_info), e)
|
(invalidate_cache(node_info), e)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -501,6 +504,8 @@ pub fn handle_try_wake(
|
|||||||
|
|
||||||
pub trait ShouldRetry {
|
pub trait ShouldRetry {
|
||||||
fn could_retry(&self) -> bool;
|
fn could_retry(&self) -> bool;
|
||||||
|
/// return true if retrying definitely won't make a difference - or is harmful
|
||||||
|
fn cannot_retry(&self) -> bool;
|
||||||
fn should_retry(&self, num_retries: u32) -> bool {
|
fn should_retry(&self, num_retries: u32) -> bool {
|
||||||
match self {
|
match self {
|
||||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||||
@@ -517,6 +522,9 @@ impl ShouldRetry for io::Error {
|
|||||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldRetry for tokio_postgres::error::DbError {
|
impl ShouldRetry for tokio_postgres::error::DbError {
|
||||||
@@ -530,6 +538,22 @@ impl ShouldRetry for tokio_postgres::error::DbError {
|
|||||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
use tokio_postgres::error::SqlState;
|
||||||
|
match self.code() {
|
||||||
|
&SqlState::TOO_MANY_CONNECTIONS
|
||||||
|
| &SqlState::INVALID_CATALOG_NAME
|
||||||
|
| &SqlState::INVALID_PASSWORD => true,
|
||||||
|
// pgbouncer errors?
|
||||||
|
&SqlState::PROTOCOL_VIOLATION => matches!(
|
||||||
|
self.message(),
|
||||||
|
"no more connections allowed (max_client_conn)"
|
||||||
|
| "server login has been failing, try again later (server_login_retry)"
|
||||||
|
| "query_wait_timeout"
|
||||||
|
),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldRetry for tokio_postgres::Error {
|
impl ShouldRetry for tokio_postgres::Error {
|
||||||
@@ -542,6 +566,13 @@ impl ShouldRetry for tokio_postgres::Error {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||||
|
tokio_postgres::error::DbError::cannot_retry(db_err)
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ShouldRetry for compute::ConnectionError {
|
impl ShouldRetry for compute::ConnectionError {
|
||||||
@@ -552,6 +583,13 @@ impl ShouldRetry for compute::ConnectionError {
|
|||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
compute::ConnectionError::Postgres(err) => err.cannot_retry(),
|
||||||
|
compute::ConnectionError::CouldNotConnect(err) => err.cannot_retry(),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||||
|
|||||||
@@ -365,6 +365,9 @@ impl ShouldRetry for TestConnectError {
|
|||||||
fn could_retry(&self) -> bool {
|
fn could_retry(&self) -> bool {
|
||||||
self.retryable
|
self.retryable
|
||||||
}
|
}
|
||||||
|
fn cannot_retry(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
Reference in New Issue
Block a user