Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
ca461ce2fd proxy: force no retry some errors 2023-09-29 17:33:13 +01:00
Em Sharnoff
b497d0094e file cache: Remove free space monitor (#5406)
This effectively reverts #3832.

There's a couple issues we just discovered with the free space monitor,
and to my knowledge, the fact we're putting the file cache on a separate
filesystem (even when on disk) that's guaranteed to have more room than
the maximum size means that this free space monitor should have no
effect.

More details:

1. The control plane sets the maximum file cache size based on max CU
2. The control plane sets the size of the filesystem underlying the file
cache based on the maximum user selectable CU (or, if the endpoint is
larger, then that size), so that there's always enough room
3. If postmaster gets SIGKILL'd, then the free space monitor process
does not exit
4. If the free space monitor is acting on the cache file but not subject
to locking or up-to-date metadata from a newer postgres instance, then
this could lead to data corruption.

So, in practice I belive the risk of data corruption is *low* but not
nothing, and given the issues we hit because of (3), and given that this
the free space monitor shouldn't be necessary because of (1) and (2),
it's best to just remove it outright.

See also: neondatabase/autoscaling#534, #5405
2023-09-28 06:47:44 -07:00
5 changed files with 71 additions and 138 deletions

View File

@@ -14,7 +14,6 @@
*/
#include <sys/file.h>
#include <sys/statvfs.h>
#include <unistd.h>
#include <fcntl.h>
@@ -38,9 +37,6 @@
#include "storage/fd.h"
#include "storage/pg_shmem.h"
#include "storage/buf_internals.h"
#include "storage/procsignal.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
/*
* Local file cache is used to temporary store relations pages in local file system.
@@ -66,9 +62,6 @@
#define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK))
#define MAX_MONITOR_INTERVAL_USEC 1000000 /* 1 second */
#define MAX_DISK_WRITE_RATE 1000 /* MB/sec */
typedef struct FileCacheEntry
{
BufferTag key;
@@ -91,14 +84,12 @@ static int lfc_desc = 0;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_free_space_watermark;
static char* lfc_path;
static FileCacheControl* lfc_ctl;
static shmem_startup_hook_type prev_shmem_startup_hook;
#if PG_VERSION_NUM>=150000
static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
void FileCacheMonitorMain(Datum main_arg);
@@ -254,80 +245,6 @@ lfc_change_limit_hook(int newval, void *extra)
LWLockRelease(lfc_lock);
}
/*
* Local file system state monitor check available free space.
* If it is lower than lfc_free_space_watermark then we shrink size of local cache
* but throwing away least recently accessed chunks.
* First time low space watermark is reached cache size is divided by two,
* second time by four,... Finally we remove all chunks from local cache.
*
* Please notice that we are not changing lfc_cache_size: it is used to be adjusted by autoscaler.
* We only throw away cached chunks but do not prevent from filling cache by new chunks.
*
* Interval of poooling cache state is calculated as minimal time needed to consume lfc_free_space_watermark
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
* Calling statvfs each second should not add any noticeable overhead.
*/
void
FileCacheMonitorMain(Datum main_arg)
{
/*
* Choose file system state monitor interval so that space can not be exosted
* during this period but not longer than MAX_MONITOR_INTERVAL (10 sec)
*/
uint64 monitor_interval = Min(MAX_MONITOR_INTERVAL_USEC, lfc_free_space_watermark*MB/MAX_DISK_WRITE_RATE);
/* Establish signal handlers. */
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, SignalHandlerForShutdownRequest);
BackgroundWorkerUnblockSignals();
/* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending)
{
if (lfc_size_limit != 0)
{
struct statvfs sfs;
if (statvfs(lfc_path, &sfs) < 0)
{
elog(WARNING, "Failed to obtain status of %s: %m", lfc_path);
}
else
{
if (sfs.f_bavail*sfs.f_bsize < lfc_free_space_watermark*MB)
{
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
lfc_shrinking_factor = 0; /* reset to initial value */
}
}
pg_usleep(monitor_interval);
}
}
static void
lfc_register_free_space_monitor(void)
{
BackgroundWorker bgw;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon");
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FileCacheMonitorMain");
snprintf(bgw.bgw_name, BGW_MAXLEN, "Local free space monitor");
snprintf(bgw.bgw_type, BGW_MAXLEN, "Local free space monitor");
bgw.bgw_restart_time = 5;
bgw.bgw_notify_pid = 0;
bgw.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&bgw);
}
void
lfc_init(void)
{
@@ -364,19 +281,6 @@ lfc_init(void)
lfc_change_limit_hook,
NULL);
DefineCustomIntVariable("neon.free_space_watermark",
"Minimal free space in local file system after reaching which local file cache will be truncated",
NULL,
&lfc_free_space_watermark,
1024, /* 1GB */
0,
INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL,
NULL,
NULL);
DefineCustomStringVariable("neon.file_cache_path",
"Path to local file cache (can be raw device)",
NULL,
@@ -391,9 +295,6 @@ lfc_init(void)
if (lfc_max_size == 0)
return;
if (lfc_free_space_watermark != 0)
lfc_register_free_space_monitor();
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = lfc_shmem_startup;
#if PG_VERSION_NUM>=150000

View File

@@ -95,6 +95,20 @@ pub mod errors {
_ => false,
}
}
fn cannot_retry(&self) -> bool {
match self {
// retry some transport errors
Self::Transport(io) => io.cannot_retry(),
// locked can be returned when the endpoint was in transition
// or when quotas are exceeded. don't retry when quotas are exceeded
Self::Console {
status: http::StatusCode::LOCKED,
ref text,
} => text.contains("quota"),
_ => false,
}
}
}
impl From<reqwest::Error> for ApiError {

View File

@@ -129,18 +129,12 @@ impl<T: AsyncRead> WithClientIp<T> {
// exit for bad header
let len = usize::min(self.buf.len(), HEADER.len());
if self.buf[..len] != HEADER[..len] {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid proxy protocol v2 header",
)));
return Poll::Ready(Ok(None));
}
// if no more bytes available then exit
if ready!(bytes_read) == 0 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"missing proxy protocol v2 header",
)));
return Poll::Ready(Ok(None));
};
}
@@ -152,27 +146,27 @@ impl<T: AsyncRead> WithClientIp<T> {
let command = vc & 0b1111;
if version != 2 {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
io::ErrorKind::Other,
"invalid proxy protocol version. expected version 2",
)));
}
let local = match command {
match command {
// the connection was established on purpose by the proxy
// without being relayed. The connection endpoints are the sender and the
// receiver. Such connections exist when the proxy sends health-checks to the
// server. The receiver must accept this connection as valid and must use the
// real connection endpoints and discard the protocol block including the
// family which is ignored.
0 => true,
0 => {}
// the connection was established on behalf of another node,
// and reflects the original connection endpoints. The receiver must then use
// the information provided in the protocol block to get original the address.
1 => false,
1 => {}
// other values are unassigned and must not be emitted by senders. Receivers
// must drop connections presenting unexpected values here.
_ => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
io::ErrorKind::Other,
"invalid proxy protocol command. expected local (0) or proxy (1)",
)))
}
@@ -192,29 +186,8 @@ impl<T: AsyncRead> WithClientIp<T> {
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
0x21 | 0x22 => 36,
// - \x31 : UNIX stream : the forwarded connection uses SOCK_STREAM over the
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
// - \x32 : UNIX datagram : the forwarded connection uses SOCK_DGRAM over the
// AF_UNIX protocol family. Address length is 2*108 = 216 bytes.
0x31 | 0x32 => 216,
// UNSPEC : the connection is forwarded for an unknown, unspecified
// or unsupported protocol. The sender should use this family when sending
// LOCAL commands or when dealing with unsupported protocol families. When
// used with a LOCAL command, the receiver must accept the connection and
// ignore any address information. For other commands, the receiver is free
// to accept the connection anyway and use the real endpoints addresses or to
// reject the connection. The receiver should ignore address information.
0x00 | 0x01 | 0x02 | 0x10 | 0x20 | 0x30 if local => 0,
// unspecified or invalid. ignore the addresses
_ => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"invalid proxy protocol address family/transport protocol",
)))
}
// unspecified or unix stream. ignore the addresses
_ => 0,
};
// The 15th and 16th bytes is the address length in bytes in network endian order.
@@ -446,7 +419,6 @@ mod tests {
}
#[tokio::test]
#[should_panic(expected = "invalid proxy protocol v2 header")]
async fn test_invalid() {
let data = [0x55; 256];
@@ -454,15 +426,20 @@ mod tests {
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, data);
assert_eq!(read.state, ProxyParse::None);
}
#[tokio::test]
#[should_panic(expected = "missing proxy protocol v2 header")]
async fn test_short() {
let mut read = pin!(WithClientIp::new(&super::HEADER.as_slice()[..10]));
let data = [0x55; 10];
let mut read = pin!(WithClientIp::new(data.as_slice()));
let mut bytes = vec![];
read.read_to_end(&mut bytes).await.unwrap();
assert_eq!(bytes, data);
assert_eq!(read.state, ProxyParse::None);
}
#[tokio::test]

View File

@@ -414,6 +414,9 @@ where
Ok(res) => return Ok(res),
Err(e) => {
error!(error = ?e, "could not connect to compute node");
if e.cannot_retry() {
return Err(e.into());
}
(invalidate_cache(node_info), e)
}
};
@@ -501,6 +504,8 @@ pub fn handle_try_wake(
pub trait ShouldRetry {
fn could_retry(&self) -> bool;
/// return true if retrying definitely won't make a difference - or is harmful
fn cannot_retry(&self) -> bool;
fn should_retry(&self, num_retries: u32) -> bool {
match self {
_ if num_retries >= NUM_RETRIES_CONNECT => false,
@@ -517,6 +522,9 @@ impl ShouldRetry for io::Error {
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
)
}
fn cannot_retry(&self) -> bool {
false
}
}
impl ShouldRetry for tokio_postgres::error::DbError {
@@ -530,6 +538,22 @@ impl ShouldRetry for tokio_postgres::error::DbError {
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
)
}
fn cannot_retry(&self) -> bool {
use tokio_postgres::error::SqlState;
match self.code() {
&SqlState::TOO_MANY_CONNECTIONS
| &SqlState::INVALID_CATALOG_NAME
| &SqlState::INVALID_PASSWORD => true,
// pgbouncer errors?
&SqlState::PROTOCOL_VIOLATION => matches!(
self.message(),
"no more connections allowed (max_client_conn)"
| "server login has been failing, try again later (server_login_retry)"
| "query_wait_timeout"
),
_ => false,
}
}
}
impl ShouldRetry for tokio_postgres::Error {
@@ -542,6 +566,13 @@ impl ShouldRetry for tokio_postgres::Error {
false
}
}
fn cannot_retry(&self) -> bool {
if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
tokio_postgres::error::DbError::cannot_retry(db_err)
} else {
false
}
}
}
impl ShouldRetry for compute::ConnectionError {
@@ -552,6 +583,13 @@ impl ShouldRetry for compute::ConnectionError {
_ => false,
}
}
fn cannot_retry(&self) -> bool {
match self {
compute::ConnectionError::Postgres(err) => err.cannot_retry(),
compute::ConnectionError::CouldNotConnect(err) => err.cannot_retry(),
_ => false,
}
}
}
pub fn retry_after(num_retries: u32) -> time::Duration {

View File

@@ -365,6 +365,9 @@ impl ShouldRetry for TestConnectError {
fn could_retry(&self) -> bool {
self.retryable
}
fn cannot_retry(&self) -> bool {
false
}
}
#[async_trait]