From 4d41b2d3799bb704041c9bfbc9a7f57e86c68916 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 23 May 2023 15:29:59 +0200 Subject: [PATCH 1/8] fix: `max_lsn_wal_lag` broken in tenant conf (#4279) This patch fixes parsing of the `max_lsn_wal_lag` tenant config item. We were incorrectly expecting a string before, but the type is a NonZeroU64. So, when setting it in the config, the (updated) test case would fail with ``` E psycopg2.errors.InternalError_: Tenant a1fa9cc383e32ddafb73ff920de5f2e6 will not become active. Current state: Broken due to: Failed to parse config from file '.../repo/tenants/a1fa9cc383e32ddafb73ff920de5f2e6/config' as pageserver config: configure option max_lsn_wal_lag is not a string. Backtrace: ``` So, not even the assertions added are necessary. The test coverage for tenant config is rather thin in general. For example, the `test_tenant_conf.py` test doesn't cover all the options. I'll add a new regression test as part of attach-time-tenant-conf PR https://github.com/neondatabase/neon/pull/4255 --- pageserver/src/config.rs | 3 ++- test_runner/regress/test_tenant_conf.py | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 9e341230cf..88a7f15b21 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -797,7 +797,8 @@ impl PageServerConf { )?); } if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") { - t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?); + t_conf.max_lsn_wal_lag = + Some(deserialize_from_item("max_lsn_wal_lag", max_lsn_wal_lag)?); } if let Some(trace_read_requests) = item.get("trace_read_requests") { t_conf.trace_read_requests = diff --git a/test_runner/regress/test_tenant_conf.py b/test_runner/regress/test_tenant_conf.py index 8677a554f7..dc523364dc 100644 --- a/test_runner/regress/test_tenant_conf.py +++ b/test_runner/regress/test_tenant_conf.py @@ -151,6 +151,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "eviction_policy": json.dumps( {"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"} ), + "max_lsn_wal_lag": "13000000", } env.neon_cli.config_tenant( tenant_id=tenant, @@ -206,6 +207,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = assert updated_effective_config["gc_horizon"] == 67108864 assert updated_effective_config["image_creation_threshold"] == 2 assert updated_effective_config["pitr_interval"] == "7days" + assert updated_effective_config["max_lsn_wal_lag"] == 13000000 # restart the pageserver and ensure that the config is still correct env.pageserver.stop() @@ -265,6 +267,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold = "period": "20s", "threshold": "23h", } + assert final_effective_config["max_lsn_wal_lag"] == 10 * 1024 * 1024 # restart the pageserver and ensure that the config is still correct env.pageserver.stop() From d75b4e0f1673f40708f7095700df87870d24cffb Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 23 May 2023 14:54:51 +0100 Subject: [PATCH 2/8] Bump requests from 2.28.1 to 2.31.0 (#4305) --- poetry.lock | 14 +++++++------- pyproject.toml | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index 141371c925..23884f6252 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2092,21 +2092,21 @@ files = [ [[package]] name = "requests" -version = "2.28.1" +version = "2.31.0" description = "Python HTTP for Humans." category = "main" optional = false -python-versions = ">=3.7, <4" +python-versions = ">=3.7" files = [ - {file = "requests-2.28.1-py3-none-any.whl", hash = "sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349"}, - {file = "requests-2.28.1.tar.gz", hash = "sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983"}, + {file = "requests-2.31.0-py3-none-any.whl", hash = "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f"}, + {file = "requests-2.31.0.tar.gz", hash = "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1"}, ] [package.dependencies] certifi = ">=2017.4.17" -charset-normalizer = ">=2,<3" +charset-normalizer = ">=2,<4" idna = ">=2.5,<4" -urllib3 = ">=1.21.1,<1.27" +urllib3 = ">=1.21.1,<3" [package.extras] socks = ["PySocks (>=1.5.6,!=1.5.7)"] @@ -2611,4 +2611,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>= [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "b689ffd6eae32b966f1744b5ac3343fe0dd26b31ee1f50e13daf5045ee0623e1" +content-hash = "a0bd73376a3e9479f2379265ccec8dd6ac9df2e525909d12b77d918d590fba55" diff --git a/pyproject.toml b/pyproject.toml index a51e91782e..574d247bf0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ pytest = "^6.2.5" psycopg2-binary = "^2.9.1" typing-extensions = "^4.1.0" PyJWT = {version = "^2.1.0", extras = ["crypto"]} -requests = "^2.26.0" +requests = "^2.31.0" pytest-xdist = "^3.0.2" asyncpg = "^0.27.0" aiopg = "^1.3.1" From dad35193514f6b50f90d6ef89a2df06d1f757ea3 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 14 Apr 2023 19:41:02 +0300 Subject: [PATCH 3/8] Add SQL-over-HTTP endpoint to Proxy This commit introduces an SQL-over-HTTP endpoint in the proxy, with a JSON response structure resembling that of the node-postgres driver. This method, using HTTP POST, achieves smaller amortized latencies in edge setups due to fewer round trips and an enhanced open connection reuse by the v8 engine. This update involves several intricacies: 1. SQL injection protection: We employed the extended query protocol, modifying the rust-postgres driver to send queries in one roundtrip using a text protocol rather than binary, bypassing potential issues like those identified in https://github.com/sfackler/rust-postgres/issues/1030. 2. Postgres type compatibility: As not all postgres types have binary representations (e.g., acl's in pg_class), we adjusted rust-postgres to respond with text protocol, simplifying serialization and fixing queries with text-only types in response. 3. Data type conversion: Considering JSON supports fewer data types than Postgres, we perform conversions where possible, passing all other types as strings. Key conversions include: - postgres int2, int4, float4, float8 -> json number (NaN and Inf remain text) - postgres bool, null, text -> json bool, null, string - postgres array -> json array - postgres json and jsonb -> json object 4. Alignment with node-postgres: To facilitate integration with js libraries, we've matched the response structure of node-postgres, returning command tags and column oids. Command tag capturing was added to the rust-postgres functionality as part of this change. --- Cargo.lock | 10 +- Cargo.toml | 12 +- proxy/README.md | 86 ++- proxy/src/config.rs | 5 +- proxy/src/http.rs | 1 + proxy/src/http/sql_over_http.rs | 603 ++++++++++++++++++ proxy/src/http/websocket.rs | 86 ++- test_runner/fixtures/neon_fixtures.py | 41 +- test_runner/regress/test_metric_collection.py | 2 + test_runner/regress/test_proxy.py | 128 +++- 10 files changed, 909 insertions(+), 65 deletions(-) create mode 100644 proxy/src/http/sql_over_http.rs diff --git a/Cargo.lock b/Cargo.lock index 55418473d5..4d63ebd99d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2820,7 +2820,7 @@ dependencies = [ [[package]] name = "postgres" version = "0.19.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" dependencies = [ "bytes", "fallible-iterator", @@ -2833,7 +2833,7 @@ dependencies = [ [[package]] name = "postgres-native-tls" version = "0.5.0" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" dependencies = [ "native-tls", "tokio", @@ -2844,7 +2844,7 @@ dependencies = [ [[package]] name = "postgres-protocol" version = "0.6.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" dependencies = [ "base64 0.20.0", "byteorder", @@ -2862,7 +2862,7 @@ dependencies = [ [[package]] name = "postgres-types" version = "0.2.4" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" dependencies = [ "bytes", "fallible-iterator", @@ -4321,7 +4321,7 @@ dependencies = [ [[package]] name = "tokio-postgres" version = "0.7.7" -source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f" +source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" dependencies = [ "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index c901532f86..7895459841 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,11 +126,11 @@ env_logger = "0.10" log = "0.4" ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } -postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" } +postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" } tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" } ## Other git libraries @@ -166,7 +166,7 @@ tonic-build = "0.9" # This is only needed for proxy's tests. # TODO: we should probably fork `tokio-postgres-rustls` instead. -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" } # Changes the MAX_THREADS limit from 4096 to 32768. # This is a temporary workaround for using tracing from many threads in safekeepers code, diff --git a/proxy/README.md b/proxy/README.md index 4ead098b73..cd76a2443f 100644 --- a/proxy/README.md +++ b/proxy/README.md @@ -1,6 +1,6 @@ # Proxy -Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following backends are currently implemented: +Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following routing backends are currently implemented: * console new SCRAM-based console API; uses SNI info to select the destination project (endpoint soon) @@ -9,6 +9,90 @@ Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme a * link sends login link for all usernames +Also proxy can expose following services to the external world: + +* postgres protocol over TCP -- usual postgres endpoint compatible with usual + postgres drivers +* postgres protocol over WebSockets -- same protocol tunneled over websockets + for environments where TCP connection is not available. We have our own + implementation of a client that uses node-postgres and tunnels traffic through + websockets: https://github.com/neondatabase/serverless +* SQL over HTTP -- service that accepts POST requests with SQL text over HTTP + and responds with JSON-serialised results. + + +## SQL over HTTP + +Contrary to the usual postgres proto over TCP and WebSockets using plain +one-shot HTTP request achieves smaller amortized latencies in edge setups due to +fewer round trips and an enhanced open connection reuse by the v8 engine. Also +such endpoint could be used directly without any driver. + +To play with it locally one may start proxy over a local postgres installation +(see end of this page on how to generate certs with openssl): + +``` +./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444 +``` + +If both postgres and proxy are running you may send a SQL query: +```json +curl -k -X POST 'https://proxy.localtest.me:4444/sql' \ + -H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \ + -H 'Content-Type: application/json' \ + --data '{ + "query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num", + "params":[ "{{1,2},{\"3\",4}}", {"key":"val", "ikey":4242}] + }' | jq + +{ + "command": "SELECT", + "fields": [ + { "dataTypeID": 1007, "name": "arr" }, + { "dataTypeID": 3802, "name": "obj" }, + { "dataTypeID": 23, "name": "num" } + ], + "rowCount": 1, + "rows": [ + { + "arr": [[1,2],[3,4]], + "num": 42, + "obj": { + "ikey": 4242, + "key": "val" + } + } + ] +} +``` + + +With the current approach we made the following design decisions: + +1. SQL injection protection: We employed the extended query protocol, modifying + the rust-postgres driver to send queries in one roundtrip using a text + protocol rather than binary, bypassing potential issues like those identified + in sfackler/rust-postgres#1030. + +2. Postgres type compatibility: As not all postgres types have binary + representations (e.g., acl's in pg_class), we adjusted rust-postgres to + respond with text protocol, simplifying serialization and fixing queries with + text-only types in response. + +3. Data type conversion: Considering JSON supports fewer data types than + Postgres, we perform conversions where possible, passing all other types as + strings. Key conversions include: + - postgres int2, int4, float4, float8 -> json number (NaN and Inf remain + text) + - postgres bool, null, text -> json bool, null, string + - postgres array -> json array + - postgres json and jsonb -> json object + +4. Alignment with node-postgres: To facilitate integration with js libraries, + we've matched the response structure of node-postgres, returning command tags + and column oids. Command tag capturing was added to the rust-postgres + functionality as part of this change. + ## Using SNI-based routing on localhost Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy: diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 530229b3fd..6a26cea78e 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -100,9 +100,10 @@ impl CertResolver { is_default: bool, ) -> anyhow::Result<()> { let priv_key = { - let key_bytes = std::fs::read(key_path).context("TLS key file")?; - let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]) + let key_bytes = std::fs::read(key_path) .context(format!("Failed to read TLS keys at '{key_path}'"))?; + let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]) + .context(format!("Failed to parse TLS keys at '{key_path}'"))?; ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len()); keys.pop().map(rustls::PrivateKey).unwrap() diff --git a/proxy/src/http.rs b/proxy/src/http.rs index a544157800..5cf49b669c 100644 --- a/proxy/src/http.rs +++ b/proxy/src/http.rs @@ -3,6 +3,7 @@ //! directly relying on deps like `reqwest` (think loose coupling). pub mod server; +pub mod sql_over_http; pub mod websocket; pub use reqwest::{Request, Response, StatusCode}; diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs new file mode 100644 index 0000000000..0438a82c12 --- /dev/null +++ b/proxy/src/http/sql_over_http.rs @@ -0,0 +1,603 @@ +use futures::pin_mut; +use futures::StreamExt; +use hyper::body::HttpBody; +use hyper::{Body, HeaderMap, Request}; +use pq_proto::StartupMessageParams; +use serde_json::json; +use serde_json::Map; +use serde_json::Value; +use tokio_postgres::types::Kind; +use tokio_postgres::types::Type; +use tokio_postgres::Row; +use url::Url; + +use crate::{auth, config::ProxyConfig, console}; + +#[derive(serde::Deserialize)] +struct QueryData { + query: String, + params: Vec, +} + +const APP_NAME: &str = "sql_over_http"; +const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB +const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB + +// +// Convert json non-string types to strings, so that they can be passed to Postgres +// as parameters. +// +fn json_to_pg_text(json: Vec) -> Result, serde_json::Error> { + json.iter() + .map(|value| { + match value { + Value::Null => serde_json::to_string(value), + Value::Bool(_) => serde_json::to_string(value), + Value::Number(_) => serde_json::to_string(value), + Value::Object(_) => serde_json::to_string(value), + + // no need to escape + Value::String(s) => Ok(s.to_string()), + + // special care for arrays + Value::Array(_) => json_array_to_pg_array(value), + } + }) + .collect() +} + +// +// Serialize a JSON array to a Postgres array. Contrary to the strings in the params +// in the array we need to escape the strings. Postgres is okay with arrays of form +// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving +// it for Postgres to check. +// +// Example of the same escaping in node-postgres: packages/pg/lib/utils.js +// +fn json_array_to_pg_array(value: &Value) -> Result { + match value { + // same + Value::Null => serde_json::to_string(value), + Value::Bool(_) => serde_json::to_string(value), + Value::Number(_) => serde_json::to_string(value), + Value::Object(_) => serde_json::to_string(value), + + // now needs to be escaped, as it is part of the array + Value::String(_) => serde_json::to_string(value), + + // recurse into array + Value::Array(arr) => { + let vals = arr + .iter() + .map(json_array_to_pg_array) + .collect::, _>>()? + .join(","); + Ok(format!("{{{}}}", vals)) + } + } +} + +fn get_conn_info( + headers: &HeaderMap, + sni_hostname: Option, +) -> Result<(String, String, String, String), anyhow::Error> { + let connection_string = headers + .get("Neon-Connection-String") + .ok_or(anyhow::anyhow!("missing connection string"))? + .to_str()?; + + let connection_url = Url::parse(connection_string)?; + + let protocol = connection_url.scheme(); + if protocol != "postgres" && protocol != "postgresql" { + return Err(anyhow::anyhow!( + "connection string must start with postgres: or postgresql:" + )); + } + + let mut url_path = connection_url + .path_segments() + .ok_or(anyhow::anyhow!("missing database name"))?; + + let dbname = url_path + .next() + .ok_or(anyhow::anyhow!("invalid database name"))?; + + let username = connection_url.username(); + if username.is_empty() { + return Err(anyhow::anyhow!("missing username")); + } + + let password = connection_url + .password() + .ok_or(anyhow::anyhow!("no password"))?; + + // TLS certificate selector now based on SNI hostname, so if we are running here + // we are sure that SNI hostname is set to one of the configured domain names. + let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?; + + let hostname = connection_url + .host_str() + .ok_or(anyhow::anyhow!("no host"))?; + + let host_header = headers + .get("host") + .and_then(|h| h.to_str().ok()) + .and_then(|h| h.split(':').next()); + + if hostname != sni_hostname { + return Err(anyhow::anyhow!("mismatched SNI hostname and hostname")); + } else if let Some(h) = host_header { + if h != hostname { + return Err(anyhow::anyhow!("mismatched host header and hostname")); + } + } + + Ok(( + username.to_owned(), + dbname.to_owned(), + hostname.to_owned(), + password.to_owned(), + )) +} + +// TODO: return different http error codes +pub async fn handle( + config: &'static ProxyConfig, + request: Request, + sni_hostname: Option, +) -> anyhow::Result { + // + // Determine the destination and connection params + // + let headers = request.headers(); + let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?; + let credential_params = StartupMessageParams::new([ + ("user", &username), + ("database", &dbname), + ("application_name", APP_NAME), + ]); + + // + // Wake up the destination if needed. Code here is a bit involved because + // we reuse the code from the usual proxy and we need to prepare few structures + // that this code expects. + // + let tls = config.tls_config.as_ref(); + let common_names = tls.and_then(|tls| tls.common_names.clone()); + let creds = config + .auth_backend + .as_ref() + .map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names)) + .transpose()?; + let extra = console::ConsoleReqExtra { + session_id: uuid::Uuid::new_v4(), + application_name: Some(APP_NAME), + }; + let node = creds.wake_compute(&extra).await?.expect("msg"); + let conf = node.value.config; + let port = *conf.get_ports().first().expect("no port"); + let host = match conf.get_hosts().first().expect("no host") { + tokio_postgres::config::Host::Tcp(host) => host, + tokio_postgres::config::Host::Unix(_) => { + return Err(anyhow::anyhow!("unix socket is not supported")); + } + }; + + let request_content_length = match request.body().size_hint().upper() { + Some(v) => v, + None => MAX_REQUEST_SIZE + 1, + }; + + if request_content_length > MAX_REQUEST_SIZE { + return Err(anyhow::anyhow!( + "request is too large (max {MAX_REQUEST_SIZE} bytes)" + )); + } + + // + // Read the query and query params from the request body + // + let body = hyper::body::to_bytes(request.into_body()).await?; + let QueryData { query, params } = serde_json::from_slice(&body)?; + let query_params = json_to_pg_text(params)?; + + // + // Connenct to the destination + // + let (client, connection) = tokio_postgres::Config::new() + .host(host) + .port(port) + .user(&username) + .password(&password) + .dbname(&dbname) + .max_backend_message_size(MAX_RESPONSE_SIZE) + .connect(tokio_postgres::NoTls) + .await?; + + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + // + // Now execute the query and return the result + // + let row_stream = client.query_raw_txt(query, query_params).await?; + + // Manually drain the stream into a vector to leave row_stream hanging + // around to get a command tag. Also check that the response is not too + // big. + pin_mut!(row_stream); + let mut rows: Vec = Vec::new(); + let mut curret_size = 0; + while let Some(row) = row_stream.next().await { + let row = row?; + curret_size += row.body_len(); + rows.push(row); + if curret_size > MAX_RESPONSE_SIZE { + return Err(anyhow::anyhow!("response too large")); + } + } + + // grab the command tag and number of rows affected + let command_tag = row_stream.command_tag().unwrap_or_default(); + let mut command_tag_split = command_tag.split(' '); + let command_tag_name = command_tag_split.next().unwrap_or_default(); + let command_tag_count = if command_tag_name == "INSERT" { + // INSERT returns OID first and then number of rows + command_tag_split.nth(1) + } else { + // other commands return number of rows (if any) + command_tag_split.next() + } + .and_then(|s| s.parse::().ok()); + + let fields = if !rows.is_empty() { + rows[0] + .columns() + .iter() + .map(|c| { + json!({ + "name": Value::String(c.name().to_owned()), + "dataTypeID": Value::Number(c.type_().oid().into()), + }) + }) + .collect::>() + } else { + Vec::new() + }; + + // convert rows to JSON + let rows = rows + .iter() + .map(pg_text_row_to_json) + .collect::, _>>()?; + + // resulting JSON format is based on the format of node-postgres result + Ok(json!({ + "command": command_tag_name, + "rowCount": command_tag_count, + "rows": rows, + "fields": fields, + })) +} + +// +// Convert postgres row with text-encoded values to JSON object +// +pub fn pg_text_row_to_json(row: &Row) -> Result { + let res = row + .columns() + .iter() + .enumerate() + .map(|(i, column)| { + let name = column.name(); + let pg_value = row.as_text(i)?; + let json_value = pg_text_to_json(pg_value, column.type_())?; + Ok((name.to_string(), json_value)) + }) + .collect::, anyhow::Error>>()?; + + Ok(Value::Object(res)) +} + +// +// Convert postgres text-encoded value to JSON value +// +pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result { + if let Some(val) = pg_value { + if val == "NULL" { + return Ok(Value::Null); + } + + if let Kind::Array(elem_type) = pg_type.kind() { + return pg_array_parse(val, elem_type); + } + + match *pg_type { + Type::BOOL => Ok(Value::Bool(val == "t")), + Type::INT2 | Type::INT4 => { + let val = val.parse::()?; + Ok(Value::Number(serde_json::Number::from(val))) + } + Type::FLOAT4 | Type::FLOAT8 => { + let fval = val.parse::()?; + let num = serde_json::Number::from_f64(fval); + if let Some(num) = num { + Ok(Value::Number(num)) + } else { + // Pass Nan, Inf, -Inf as strings + // JS JSON.stringify() does converts them to null, but we + // want to preserve them, so we pass them as strings + Ok(Value::String(val.to_string())) + } + } + Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?), + _ => Ok(Value::String(val.to_string())), + } + } else { + Ok(Value::Null) + } +} + +// +// Parse postgres array into JSON array. +// +// This is a bit involved because we need to handle nested arrays and quoted +// values. Unlike postgres we don't check that all nested arrays have the same +// dimensions, we just return them as is. +// +fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result { + _pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v) +} + +fn _pg_array_parse( + pg_array: &str, + elem_type: &Type, + nested: bool, +) -> Result<(Value, usize), anyhow::Error> { + let mut pg_array_chr = pg_array.char_indices(); + let mut level = 0; + let mut quote = false; + let mut entries: Vec = Vec::new(); + let mut entry = String::new(); + + // skip bounds decoration + if let Some('[') = pg_array.chars().next() { + for (_, c) in pg_array_chr.by_ref() { + if c == '=' { + break; + } + } + } + + while let Some((mut i, mut c)) = pg_array_chr.next() { + let mut escaped = false; + + if c == '\\' { + escaped = true; + (i, c) = pg_array_chr.next().unwrap(); + } + + match c { + '{' if !quote => { + level += 1; + if level > 1 { + let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?; + entries.push(res); + for _ in 0..off - 1 { + pg_array_chr.next(); + } + } + } + '}' => { + level -= 1; + if level == 0 { + if !entry.is_empty() { + entries.push(pg_text_to_json(Some(&entry), elem_type)?); + } + if nested { + return Ok((Value::Array(entries), i)); + } + } + } + '"' if !escaped => { + if quote { + // push even if empty + entries.push(pg_text_to_json(Some(&entry), elem_type)?); + entry = String::new(); + } + quote = !quote; + } + ',' if !quote => { + if !entry.is_empty() { + entries.push(pg_text_to_json(Some(&entry), elem_type)?); + entry = String::new(); + } + } + _ => { + entry.push(c); + } + } + } + + if level != 0 { + return Err(anyhow::anyhow!("unbalanced array")); + } + + Ok((Value::Array(entries), 0)) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_atomic_types_to_pg_params() { + let json = vec![Value::Bool(true), Value::Bool(false)]; + let pg_params = json_to_pg_text(json).unwrap(); + assert_eq!(pg_params, vec!["true", "false"]); + + let json = vec![Value::Number(serde_json::Number::from(42))]; + let pg_params = json_to_pg_text(json).unwrap(); + assert_eq!(pg_params, vec!["42"]); + + let json = vec![Value::String("foo\"".to_string())]; + let pg_params = json_to_pg_text(json).unwrap(); + assert_eq!(pg_params, vec!["foo\""]); + + let json = vec![Value::Null]; + let pg_params = json_to_pg_text(json).unwrap(); + assert_eq!(pg_params, vec!["null"]); + } + + #[test] + fn test_json_array_to_pg_array() { + // atoms and escaping + let json = "[true, false, null, 42, \"foo\", \"bar\\\"-\\\\\"]"; + let json: Value = serde_json::from_str(json).unwrap(); + let pg_params = json_to_pg_text(vec![json]).unwrap(); + assert_eq!( + pg_params, + vec!["{true,false,null,42,\"foo\",\"bar\\\"-\\\\\"}"] + ); + + // nested arrays + let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]"; + let json: Value = serde_json::from_str(json).unwrap(); + let pg_params = json_to_pg_text(vec![json]).unwrap(); + assert_eq!( + pg_params, + vec!["{{true,false},{null,42},{\"foo\",\"bar\\\"-\\\\\"}}"] + ); + } + + #[test] + fn test_atomic_types_parse() { + assert_eq!( + pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(), + json!("foo") + ); + assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null)); + assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42)); + assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42)); + assert_eq!( + pg_text_to_json(Some("42"), &Type::INT8).unwrap(), + json!("42") + ); + assert_eq!( + pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(), + json!(42.42) + ); + assert_eq!( + pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(), + json!(42.42) + ); + assert_eq!( + pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(), + json!("NaN") + ); + assert_eq!( + pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(), + json!("Infinity") + ); + assert_eq!( + pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(), + json!("-Infinity") + ); + + let json: Value = + serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}") + .unwrap(); + assert_eq!( + pg_text_to_json( + Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#), + &Type::JSONB + ) + .unwrap(), + json + ); + } + + #[test] + fn test_pg_array_parse_text() { + fn pt(pg_arr: &str) -> Value { + pg_array_parse(pg_arr, &Type::TEXT).unwrap() + } + assert_eq!( + pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#), + json!(["aa\"\\,a", "cha", "bbbb"]) + ); + assert_eq!( + pt(r#"{{"foo","bar"},{"bee","bop"}}"#), + json!([["foo", "bar"], ["bee", "bop"]]) + ); + assert_eq!( + pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#), + json!([[[["foo", null, "bop", "bup"]]]]) + ); + assert_eq!( + pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#), + json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]]) + ); + } + + #[test] + fn test_pg_array_parse_bool() { + fn pb(pg_arr: &str) -> Value { + pg_array_parse(pg_arr, &Type::BOOL).unwrap() + } + assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true])); + assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]])); + assert_eq!( + pb(r#"{{t,f},{f,t}}"#), + json!([[true, false], [false, true]]) + ); + assert_eq!( + pb(r#"{{t,NULL},{NULL,f}}"#), + json!([[true, null], [null, false]]) + ); + } + + #[test] + fn test_pg_array_parse_numbers() { + fn pn(pg_arr: &str, ty: &Type) -> Value { + pg_array_parse(pg_arr, ty).unwrap() + } + assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3])); + assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3])); + assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"])); + assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0])); + assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0])); + assert_eq!( + pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4), + json!([1.1, 2.2, 3.3]) + ); + assert_eq!( + pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8), + json!([1.1, 2.2, 3.3]) + ); + assert_eq!( + pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4), + json!(["NaN", "Infinity", "-Infinity"]) + ); + assert_eq!( + pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8), + json!(["NaN", "Infinity", "-Infinity"]) + ); + } + + #[test] + fn test_pg_array_with_decoration() { + fn p(pg_arr: &str) -> Value { + pg_array_parse(pg_arr, &Type::INT2).unwrap() + } + assert_eq!( + p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#), + json!([[[1, 2, 3], [4, 5, 6]]]) + ); + } +} diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index c7676e8e14..fbb602e3d2 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -4,12 +4,17 @@ use crate::{ use bytes::{Buf, Bytes}; use futures::{Sink, Stream, StreamExt}; use hyper::{ - server::{accept, conn::AddrIncoming}, + server::{ + accept, + conn::{AddrIncoming, AddrStream}, + }, upgrade::Upgraded, - Body, Request, Response, StatusCode, + Body, Method, Request, Response, StatusCode, }; use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream}; use pin_project_lite::pin_project; +use serde_json::{json, Value}; + use std::{ convert::Infallible, future::ready, @@ -21,6 +26,7 @@ use tls_listener::TlsListener; use tokio::{ io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}, net::TcpListener, + select, }; use tokio_util::sync::CancellationToken; use tracing::{error, info, info_span, warn, Instrument}; @@ -30,6 +36,8 @@ use utils::http::{error::ApiError, json::json_response}; // Tracking issue: https://github.com/rust-lang/rust/issues/98407. use sync_wrapper::SyncWrapper; +use super::sql_over_http; + pin_project! { /// This is a wrapper around a [`WebSocketStream`] that /// implements [`AsyncRead`] and [`AsyncWrite`]. @@ -159,6 +167,7 @@ async fn ws_handler( config: &'static ProxyConfig, cancel_map: Arc, session_id: uuid::Uuid, + sni_hostname: Option, ) -> Result, ApiError> { let host = request .headers() @@ -181,8 +190,44 @@ async fn ws_handler( // Return the response so the spawned future can continue. Ok(response) + // TODO: that deserves a refactor as now this function also handles http json client besides websockets. + // Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead. + } else if request.uri().path() == "/sql" && request.method() == Method::POST { + let result = select! { + _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => { + Err(anyhow::anyhow!("Query timed out")) + } + response = sql_over_http::handle(config, request, sni_hostname) => { + response + } + }; + let status_code = match result { + Ok(_) => StatusCode::OK, + Err(_) => StatusCode::BAD_REQUEST, + }; + let json = match result { + Ok(r) => r, + Err(e) => { + let message = format!("{:?}", e); + let code = match e.downcast_ref::() { + Some(e) => match e.code() { + Some(e) => serde_json::to_value(e.code()).unwrap(), + None => Value::Null, + }, + None => Value::Null, + }; + json!({ "message": message, "code": code }) + } + }; + json_response(status_code, json).map(|mut r| { + r.headers_mut().insert( + "Access-Control-Allow-Origin", + hyper::http::HeaderValue::from_static("*"), + ); + r + }) } else { - json_response(StatusCode::OK, "Connect with a websocket client") + json_response(StatusCode::BAD_REQUEST, "query is not supported") } } @@ -216,20 +261,27 @@ pub async fn task_main( } }); - let make_svc = hyper::service::make_service_fn(|_stream| async move { - Ok::<_, Infallible>(hyper::service::service_fn( - move |req: Request| async move { - let cancel_map = Arc::new(CancelMap::default()); - let session_id = uuid::Uuid::new_v4(); - ws_handler(req, config, cancel_map, session_id) - .instrument(info_span!( - "ws-client", - session = format_args!("{session_id}") - )) - .await - }, - )) - }); + let make_svc = + hyper::service::make_service_fn(|stream: &tokio_rustls::server::TlsStream| { + let sni_name = stream.get_ref().1.sni_hostname().map(|s| s.to_string()); + + async move { + Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request| { + let sni_name = sni_name.clone(); + async move { + let cancel_map = Arc::new(CancelMap::default()); + let session_id = uuid::Uuid::new_v4(); + + ws_handler(req, config, cancel_map, session_id, sni_name) + .instrument(info_span!( + "ws-client", + session = format_args!("{session_id}") + )) + .await + } + })) + } + }); hyper::Server::builder(accept::from_stream(tls_listener)) .serve(make_svc) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 8ec17834ac..bde91e6783 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2042,15 +2042,19 @@ class NeonProxy(PgProtocol): proxy_port: int, http_port: int, mgmt_port: int, + external_http_port: int, auth_backend: NeonProxy.AuthBackend, metric_collection_endpoint: Optional[str] = None, metric_collection_interval: Optional[str] = None, ): host = "127.0.0.1" - super().__init__(dsn=auth_backend.default_conn_url, host=host, port=proxy_port) + domain = "proxy.localtest.me" # resolves to 127.0.0.1 + super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port) + self.domain = domain self.host = host self.http_port = http_port + self.external_http_port = external_http_port self.neon_binpath = neon_binpath self.test_output_dir = test_output_dir self.proxy_port = proxy_port @@ -2062,11 +2066,42 @@ class NeonProxy(PgProtocol): def start(self) -> NeonProxy: assert self._popen is None + + # generate key of it doesn't exist + crt_path = self.test_output_dir / "proxy.crt" + key_path = self.test_output_dir / "proxy.key" + + if not key_path.exists(): + r = subprocess.run( + [ + "openssl", + "req", + "-new", + "-x509", + "-days", + "365", + "-nodes", + "-text", + "-out", + str(crt_path), + "-keyout", + str(key_path), + "-subj", + "/CN=*.localtest.me", + "-addext", + "subjectAltName = DNS:*.localtest.me", + ] + ) + assert r.returncode == 0 + args = [ str(self.neon_binpath / "proxy"), *["--http", f"{self.host}:{self.http_port}"], *["--proxy", f"{self.host}:{self.proxy_port}"], *["--mgmt", f"{self.host}:{self.mgmt_port}"], + *["--wss", f"{self.host}:{self.external_http_port}"], + *["-c", str(crt_path)], + *["-k", str(key_path)], *self.auth_backend.extra_args(), ] @@ -2190,6 +2225,7 @@ def link_proxy( http_port = port_distributor.get_port() proxy_port = port_distributor.get_port() mgmt_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() with NeonProxy( neon_binpath=neon_binpath, @@ -2197,6 +2233,7 @@ def link_proxy( proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, + external_http_port=external_http_port, auth_backend=NeonProxy.Link(), ) as proxy: proxy.start() @@ -2224,6 +2261,7 @@ def static_proxy( proxy_port = port_distributor.get_port() mgmt_port = port_distributor.get_port() http_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() with NeonProxy( neon_binpath=neon_binpath, @@ -2231,6 +2269,7 @@ def static_proxy( proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, + external_http_port=external_http_port, auth_backend=NeonProxy.Postgres(auth_endpoint), ) as proxy: proxy.start() diff --git a/test_runner/regress/test_metric_collection.py b/test_runner/regress/test_metric_collection.py index 1231188896..00ea77f2e7 100644 --- a/test_runner/regress/test_metric_collection.py +++ b/test_runner/regress/test_metric_collection.py @@ -204,6 +204,7 @@ def proxy_with_metric_collector( http_port = port_distributor.get_port() proxy_port = port_distributor.get_port() mgmt_port = port_distributor.get_port() + external_http_port = port_distributor.get_port() (host, port) = httpserver_listen_address metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events" @@ -215,6 +216,7 @@ def proxy_with_metric_collector( proxy_port=proxy_port, http_port=http_port, mgmt_port=mgmt_port, + external_http_port=external_http_port, metric_collection_endpoint=metric_collection_endpoint, metric_collection_interval=metric_collection_interval, auth_backend=NeonProxy.Link(), diff --git a/test_runner/regress/test_proxy.py b/test_runner/regress/test_proxy.py index ae914e384e..6be3995714 100644 --- a/test_runner/regress/test_proxy.py +++ b/test_runner/regress/test_proxy.py @@ -1,22 +1,32 @@ +import json import subprocess +from typing import Any, List import psycopg2 import pytest +import requests from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres -@pytest.mark.parametrize("option_name", ["project", "endpoint"]) -def test_proxy_select_1(static_proxy: NeonProxy, option_name: str): +def test_proxy_select_1(static_proxy: NeonProxy): """ A simplest smoke test: check proxy against a local postgres instance. """ - out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name") + # no SNI, deprecated `options=project` syntax (before we had several endpoint in project) + out = static_proxy.safe_psql("select 1", sslsni=0, options="project=generic-project-name") assert out[0][0] == 1 + # no SNI, new `options=endpoint` syntax + out = static_proxy.safe_psql("select 1", sslsni=0, options="endpoint=generic-project-name") + assert out[0][0] == 1 -@pytest.mark.parametrize("option_name", ["project", "endpoint"]) -def test_password_hack(static_proxy: NeonProxy, option_name: str): + # with SNI + out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me") + assert out[0][0] == 42 + + +def test_password_hack(static_proxy: NeonProxy): """ Check the PasswordHack auth flow: an alternative to SCRAM auth for clients which can't provide the project/endpoint name via SNI or `options`. @@ -24,14 +34,16 @@ def test_password_hack(static_proxy: NeonProxy, option_name: str): user = "borat" password = "password" - static_proxy.safe_psql( - f"create role {user} with login password '{password}'", - options=f"{option_name}=irrelevant", - ) + static_proxy.safe_psql(f"create role {user} with login password '{password}'") # Note the format of `magic`! - magic = f"{option_name}=irrelevant;{password}" - static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic) + magic = f"project=irrelevant;{password}" + out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic) + assert out[0][0] == 1 + + magic = f"endpoint=irrelevant;{password}" + out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic) + assert out[0][0] == 1 # Must also check that invalid magic won't be accepted. with pytest.raises(psycopg2.OperationalError): @@ -69,52 +81,55 @@ def test_proxy_options(static_proxy: NeonProxy, option_name: str): """ options = f"{option_name}=irrelevant -cproxytest.option=value" - out = static_proxy.safe_psql("show proxytest.option", options=options) + out = static_proxy.safe_psql("show proxytest.option", options=options, sslsni=0) assert out[0][0] == "value" options = f"-c proxytest.foo=\\ str {option_name}=irrelevant" + out = static_proxy.safe_psql("show proxytest.foo", options=options, sslsni=0) + assert out[0][0] == " str" + + options = "-cproxytest.option=value" + out = static_proxy.safe_psql("show proxytest.option", options=options) + assert out[0][0] == "value" + + options = "-c proxytest.foo=\\ str" out = static_proxy.safe_psql("show proxytest.foo", options=options) assert out[0][0] == " str" -@pytest.mark.parametrize("option_name", ["project", "endpoint"]) -def test_auth_errors(static_proxy: NeonProxy, option_name: str): +def test_auth_errors(static_proxy: NeonProxy): """ Check that we throw very specific errors in some unsuccessful auth scenarios. """ # User does not exist with pytest.raises(psycopg2.Error) as exprinfo: - static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant") + static_proxy.connect(user="pinocchio") text = str(exprinfo.value).strip() - assert text.endswith("password authentication failed for user 'pinocchio'") + assert text.find("password authentication failed for user 'pinocchio'") != -1 static_proxy.safe_psql( "create role pinocchio with login password 'magic'", - options=f"{option_name}=irrelevant", ) # User exists, but password is missing with pytest.raises(psycopg2.Error) as exprinfo: - static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant") + static_proxy.connect(user="pinocchio", password=None) text = str(exprinfo.value).strip() - assert text.endswith("password authentication failed for user 'pinocchio'") + assert text.find("password authentication failed for user 'pinocchio'") != -1 # User exists, but password is wrong with pytest.raises(psycopg2.Error) as exprinfo: - static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant") + static_proxy.connect(user="pinocchio", password="bad") text = str(exprinfo.value).strip() - assert text.endswith("password authentication failed for user 'pinocchio'") + assert text.find("password authentication failed for user 'pinocchio'") != -1 # Finally, check that the user can connect - with static_proxy.connect( - user="pinocchio", password="magic", options=f"{option_name}=irrelevant" - ): + with static_proxy.connect(user="pinocchio", password="magic"): pass -@pytest.mark.parametrize("option_name", ["project", "endpoint"]) -def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str): +def test_forward_params_to_client(static_proxy: NeonProxy): """ Check that we forward all necessary PostgreSQL server params to client. """ @@ -140,7 +155,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str): where name = any(%s) """ - with static_proxy.connect(options=f"{option_name}=irrelevant") as conn: + with static_proxy.connect() as conn: with conn.cursor() as cur: cur.execute(query, (reported_params_subset,)) for name, value in cur.fetchall(): @@ -148,18 +163,65 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str): assert conn.get_parameter_status(name) == value -@pytest.mark.parametrize("option_name", ["project", "endpoint"]) @pytest.mark.timeout(5) -def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str): +def test_close_on_connections_exit(static_proxy: NeonProxy): # Open two connections, send SIGTERM, then ensure that proxy doesn't exit # until after connections close. - with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect( - options=f"{option_name}=irrelevant" - ): + with static_proxy.connect(), static_proxy.connect(): static_proxy.terminate() with pytest.raises(subprocess.TimeoutExpired): static_proxy.wait_for_exit(timeout=2) # Ensure we don't accept any more connections with pytest.raises(psycopg2.OperationalError): - static_proxy.connect(options=f"{option_name}=irrelevant") + static_proxy.connect() static_proxy.wait_for_exit() + + +def test_sql_over_http(static_proxy: NeonProxy): + static_proxy.safe_psql("create role http with login password 'http' superuser") + + def q(sql: str, params: List[Any] = []) -> Any: + connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres" + response = requests.post( + f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql", + data=json.dumps({"query": sql, "params": params}), + headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr}, + verify=str(static_proxy.test_output_dir / "proxy.crt"), + ) + assert response.status_code == 200 + return response.json() + + rows = q("select 42 as answer")["rows"] + assert rows == [{"answer": 42}] + + rows = q("select $1 as answer", [42])["rows"] + assert rows == [{"answer": "42"}] + + rows = q("select $1 * 1 as answer", [42])["rows"] + assert rows == [{"answer": 42}] + + rows = q("select $1::int[] as answer", [[1, 2, 3]])["rows"] + assert rows == [{"answer": [1, 2, 3]}] + + rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"] + assert rows == [{"answer": {"b": 42}}] + + rows = q("select * from pg_class limit 1")["rows"] + assert len(rows) == 1 + + res = q("create table t(id serial primary key, val int)") + assert res["command"] == "CREATE" + assert res["rowCount"] is None + + res = q("insert into t(val) values (10), (20), (30) returning id") + assert res["command"] == "INSERT" + assert res["rowCount"] == 3 + assert res["rows"] == [{"id": 1}, {"id": 2}, {"id": 3}] + + res = q("select * from t") + assert res["command"] == "SELECT" + assert res["rowCount"] == 3 + + res = q("drop table t") + assert res["command"] == "DROP" + assert res["rowCount"] is None From 00f7fc324d44dfd16001cfa1b0b01a1c534ef0e0 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 23 May 2023 21:16:12 +0200 Subject: [PATCH 4/8] tenant_map_insert: don't expose the vacant entry to the closure (#4316) This tightens up the API a little. Byproduct of some refactoring work that I'm doing right now. --- pageserver/src/tenant/mgr.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 1542d34a66..53d69a15dc 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -278,7 +278,7 @@ pub async fn create_tenant( remote_storage: Option, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { - tenant_map_insert(tenant_id, |vacant_entry| { + tenant_map_insert(tenant_id, || { // We're holding the tenants lock in write mode while doing local IO. // If this section ever becomes contentious, introduce a new `TenantState::Creating` // and do the work in that state. @@ -296,7 +296,6 @@ pub async fn create_tenant( tenant_id == crated_tenant_id, "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {crated_tenant_id})", ); - vacant_entry.insert(Arc::clone(&created_tenant)); Ok(created_tenant) }).await } @@ -408,7 +407,7 @@ pub async fn load_tenant( remote_storage: Option, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, |vacant_entry| { + tenant_map_insert(tenant_id, || { let tenant_path = conf.tenant_path(&tenant_id); let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(tenant_id); if tenant_ignore_mark.exists() { @@ -421,9 +420,9 @@ pub async fn load_tenant( format!("Failed to schedule tenant processing in path {tenant_path:?}") })?; - vacant_entry.insert(new_tenant); - Ok(()) - }).await + Ok(new_tenant) + }).await?; + Ok(()) } pub async fn ignore_tenant( @@ -476,7 +475,7 @@ pub async fn attach_tenant( remote_storage: GenericRemoteStorage, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - tenant_map_insert(tenant_id, |vacant_entry| { + tenant_map_insert(tenant_id, || { let tenant_dir = create_tenant_files(conf, tenant_conf, tenant_id, CreateTenantFilesMode::Attach)?; // TODO: tenant directory remains on disk if we bail out from here on. // See https://github.com/neondatabase/neon/issues/4233 @@ -497,10 +496,10 @@ pub async fn attach_tenant( tenant_id == attached_tenant_id, "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {attached_tenant_id})", ); - vacant_entry.insert(Arc::clone(&attached_tenant)); - Ok(()) + Ok(attached_tenant) }) - .await + .await?; + Ok(()) } #[derive(Debug, thiserror::Error)] @@ -521,12 +520,12 @@ pub enum TenantMapInsertError { /// /// NB: the closure should return quickly because the current implementation of tenants map /// serializes access through an `RwLock`. -async fn tenant_map_insert( +async fn tenant_map_insert( tenant_id: TenantId, insert_fn: F, -) -> Result +) -> Result, TenantMapInsertError> where - F: FnOnce(hash_map::VacantEntry>) -> anyhow::Result, + F: FnOnce() -> anyhow::Result>, { let mut guard = TENANTS.write().await; let m = match &mut *guard { @@ -539,8 +538,11 @@ where tenant_id, e.get().current_state(), )), - hash_map::Entry::Vacant(v) => match insert_fn(v) { - Ok(v) => Ok(v), + hash_map::Entry::Vacant(v) => match insert_fn() { + Ok(tenant) => { + v.insert(tenant.clone()); + Ok(tenant) + } Err(e) => Err(TenantMapInsertError::Closure(e)), }, } From 7f1973f8acd55b472265d3161742ab617a9b1976 Mon Sep 17 00:00:00 2001 From: sharnoff Date: Tue, 23 May 2023 15:20:20 -0700 Subject: [PATCH 5/8] bump vm-builder, use Neon-specific version (#4155) In the v0.6.0 release, vm-builder was changed to be Neon-specific, so it's handling all the stuff that Dockerfile.vm-compute-node used to do. This commit bumps vm-builder to v0.7.3-alpha3. --- .github/workflows/build_and_test.yml | 13 ++---- Dockerfile.vm-compute-node | 70 ---------------------------- 2 files changed, 5 insertions(+), 78 deletions(-) delete mode 100644 Dockerfile.vm-compute-node diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 564251ef8f..845a21ad0e 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -777,7 +777,7 @@ jobs: run: shell: sh -eu {0} env: - VM_BUILDER_VERSION: v0.4.6 + VM_BUILDER_VERSION: v0.7.3-alpha3 steps: - name: Checkout @@ -787,21 +787,18 @@ jobs: - name: Downloading vm-builder run: | - curl -L https://github.com/neondatabase/neonvm/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder + curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder chmod +x vm-builder + # Note: we need a separate pull step here because otherwise vm-builder will try to pull, and + # it won't have the proper authentication (written at v0.6.0) - name: Pulling compute-node image run: | docker pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} - - name: Building VM compute-node rootfs - run: | - docker build -t temp-vm-compute-node --build-arg SRC_IMAGE=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -f Dockerfile.vm-compute-node . - - name: Build vm image run: | - # note: as of 2023-01-12, vm-builder requires a trailing ":latest" for local images - ./vm-builder -use-inittab -src=temp-vm-compute-node:latest -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} + ./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} - name: Pushing vm-compute-node image run: | diff --git a/Dockerfile.vm-compute-node b/Dockerfile.vm-compute-node deleted file mode 100644 index aabb3c9953..0000000000 --- a/Dockerfile.vm-compute-node +++ /dev/null @@ -1,70 +0,0 @@ -# Note: this file *mostly* just builds on Dockerfile.compute-node - -ARG SRC_IMAGE -ARG VM_INFORMANT_VERSION=v0.1.14 -# on libcgroup update, make sure to check bootstrap.sh for changes -ARG LIBCGROUP_VERSION=v2.0.3 - -# Pull VM informant, to copy from later -FROM neondatabase/vm-informant:$VM_INFORMANT_VERSION as informant - -# Build cgroup-tools -# -# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically -# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-informant -# requires cgroup v2, so we'll build cgroup-tools ourselves. -FROM debian:bullseye-slim as libcgroup-builder -ARG LIBCGROUP_VERSION - -RUN set -exu \ - && apt update \ - && apt install --no-install-recommends -y \ - git \ - ca-certificates \ - automake \ - cmake \ - make \ - gcc \ - byacc \ - flex \ - libtool \ - libpam0g-dev \ - && git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \ - && INSTALL_DIR="/libcgroup-install" \ - && mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \ - && cd libcgroup \ - # extracted from bootstrap.sh, with modified flags: - && (test -d m4 || mkdir m4) \ - && autoreconf -fi \ - && rm -rf autom4te.cache \ - && CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \ - # actually build the thing... - && make install - -# Combine, starting from non-VM compute node image. -FROM $SRC_IMAGE as base - -# Temporarily set user back to root so we can run adduser, set inittab -USER root -RUN adduser vm-informant --disabled-password --no-create-home - -RUN set -e \ - && rm -f /etc/inittab \ - && touch /etc/inittab - -RUN set -e \ - && echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \ - && CONNSTR="dbname=postgres user=cloud_admin sslmode=disable" \ - && ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \ - && echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab - -USER postgres - -ADD vm-cgconfig.conf /etc/cgconfig.conf -COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant - -COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/ -COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/ -COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/ - -ENTRYPOINT ["/usr/sbin/cgexec", "-g", "*:neon-postgres", "/usr/local/bin/compute_ctl"] From 417f37b2e81b92e94c660b392a57cc95c73152f3 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 24 May 2023 08:01:41 +0300 Subject: [PATCH 6/8] Pass set of wanted image layers from GC to compaction (#3673) ## Describe your changes Right now the only criteria for image layer generation is number of delta layer since last image layer. If we have "stairs" layout of delta layers (see link below) then it can happen that there a lot of old delta layers which can not be reclaimed by GC because are not fully covered with image layers. This PR constructs list of "wanted" image layers in GC (which image layers are needed to be able to remove old layers) and pass this list to compaction task which performs generation of image layers. So right now except deltas count criteria we also take in account "wishes" of GC. ## Issue ticket number and link See https://neondb.slack.com/archives/C033RQ5SPDH/p1676914249982519 ## Checklist before requesting a review - [ ] I have performed a self-review of my code. - [ ] If it is a core feature, I have added thorough tests. - [ ] Do we need to implement analytics? if so did you add the relevant metrics to the dashboard? - [ ] If this PR requires public announcement, mark it with /release-notes label and add several sentences in this section. --------- Co-authored-by: Joonas Koivunen Co-authored-by: Heikki Linnakangas --- pageserver/src/keyspace.rs | 237 +++++++++++++++++++- pageserver/src/tenant/timeline.rs | 61 ++++- test_runner/performance/test_gc_feedback.py | 76 +++++++ 3 files changed, 370 insertions(+), 4 deletions(-) create mode 100644 test_runner/performance/test_gc_feedback.py diff --git a/pageserver/src/keyspace.rs b/pageserver/src/keyspace.rs index 64024a2d8d..20e6df9c7b 100644 --- a/pageserver/src/keyspace.rs +++ b/pageserver/src/keyspace.rs @@ -5,7 +5,7 @@ use std::ops::Range; /// /// Represents a set of Keys, in a compact form. /// -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct KeySpace { /// Contiguous ranges of keys that belong to the key space. In key order, /// and with no overlap. @@ -61,6 +61,18 @@ impl KeySpace { KeyPartitioning { parts } } + + /// + /// Check if key space contains overlapping range + /// + pub fn overlaps(&self, range: &Range) -> bool { + match self.ranges.binary_search_by_key(&range.end, |r| r.start) { + Ok(0) => false, + Err(0) => false, + Ok(index) => self.ranges[index - 1].end > range.start, + Err(index) => self.ranges[index - 1].end > range.start, + } + } } /// @@ -129,3 +141,226 @@ impl KeySpaceAccum { } } } + +/// +/// A helper object, to collect a set of keys and key ranges into a KeySpace +/// object. Key ranges may be inserted in any order and can overlap. +/// +#[derive(Clone, Debug, Default)] +pub struct KeySpaceRandomAccum { + ranges: Vec>, +} + +impl KeySpaceRandomAccum { + pub fn new() -> Self { + Self { ranges: Vec::new() } + } + + pub fn add_key(&mut self, key: Key) { + self.add_range(singleton_range(key)) + } + + pub fn add_range(&mut self, range: Range) { + self.ranges.push(range); + } + + pub fn to_keyspace(mut self) -> KeySpace { + let mut ranges = Vec::new(); + if !self.ranges.is_empty() { + self.ranges.sort_by_key(|r| r.start); + let mut start = self.ranges.first().unwrap().start; + let mut end = self.ranges.first().unwrap().end; + for r in self.ranges { + assert!(r.start >= start); + if r.start > end { + ranges.push(start..end); + start = r.start; + end = r.end; + } else if r.end > end { + end = r.end; + } + } + ranges.push(start..end); + } + KeySpace { ranges } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fmt::Write; + + // Helper function to create a key range. + // + // Make the tests below less verbose. + fn kr(irange: Range) -> Range { + Key::from_i128(irange.start)..Key::from_i128(irange.end) + } + + #[allow(dead_code)] + fn dump_keyspace(ks: &KeySpace) { + for r in ks.ranges.iter() { + println!(" {}..{}", r.start.to_i128(), r.end.to_i128()); + } + } + + fn assert_ks_eq(actual: &KeySpace, expected: Vec>) { + if actual.ranges != expected { + let mut msg = String::new(); + + writeln!(msg, "expected:").unwrap(); + for r in &expected { + writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap(); + } + writeln!(msg, "got:").unwrap(); + for r in &actual.ranges { + writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap(); + } + panic!("{}", msg); + } + } + + #[test] + fn keyspace_add_range() { + // two separate ranges + // + // ##### + // ##### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(0..10)); + ks.add_range(kr(20..30)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]); + + // two separate ranges, added in reverse order + // + // ##### + // ##### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(20..30)); + ks.add_range(kr(0..10)); + + // add range that is adjacent to the end of an existing range + // + // ##### + // ##### + ks.add_range(kr(0..10)); + ks.add_range(kr(10..30)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + + // add range that is adjacent to the start of an existing range + // + // ##### + // ##### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(10..30)); + ks.add_range(kr(0..10)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + + // add range that overlaps with the end of an existing range + // + // ##### + // ##### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(0..10)); + ks.add_range(kr(5..30)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + + // add range that overlaps with the start of an existing range + // + // ##### + // ##### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(5..30)); + ks.add_range(kr(0..10)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + + // add range that is fully covered by an existing range + // + // ######### + // ##### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(0..30)); + ks.add_range(kr(10..20)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + + // add range that extends an existing range from both ends + // + // ##### + // ######### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(10..20)); + ks.add_range(kr(0..30)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + + // add a range that overlaps with two existing ranges, joining them + // + // ##### ##### + // ####### + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(0..10)); + ks.add_range(kr(20..30)); + ks.add_range(kr(5..25)); + assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]); + } + + #[test] + fn keyspace_overlaps() { + let mut ks = KeySpaceRandomAccum::default(); + ks.add_range(kr(10..20)); + ks.add_range(kr(30..40)); + let ks = ks.to_keyspace(); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(0..5))); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(5..9))); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(5..10))); + + // ##### ##### + // xxxx + assert!(ks.overlaps(&kr(5..11))); + + // ##### ##### + // xxxx + assert!(ks.overlaps(&kr(10..15))); + + // ##### ##### + // xxxx + assert!(ks.overlaps(&kr(15..20))); + + // ##### ##### + // xxxx + assert!(ks.overlaps(&kr(15..25))); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(22..28))); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(25..30))); + + // ##### ##### + // xxxx + assert!(ks.overlaps(&kr(35..35))); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(40..45))); + + // ##### ##### + // xxxx + assert!(!ks.overlaps(&kr(45..50))); + + // ##### ##### + // xxxxxxxxxxx + assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently! + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c47f4444f5..3c951c1188 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -22,8 +22,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; -use std::collections::BinaryHeap; -use std::collections::HashMap; +use std::collections::{BinaryHeap, HashMap}; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; @@ -48,7 +47,7 @@ use crate::tenant::{ }; use crate::config::PageServerConf; -use crate::keyspace::{KeyPartitioning, KeySpace}; +use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum}; use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; @@ -123,6 +122,17 @@ pub struct Timeline { pub(super) layers: RwLock>, + /// Set of key ranges which should be covered by image layers to + /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. + /// It is used by compaction task when it checks if new image layer should be created. + /// Newly created image layer doesn't help to remove the delta layer, until the + /// newly created image layer falls off the PITR horizon. So on next GC cycle, + /// gc_timeline may still want the new image layer to be created. To avoid redundant + /// image layers creation we should check if image layer exists but beyond PITR horizon. + /// This is why we need remember GC cutoff LSN. + /// + wanted_image_layers: Mutex>, + last_freeze_at: AtomicLsn, // Atomic would be more appropriate here. last_freeze_ts: RwLock, @@ -1354,6 +1364,7 @@ impl Timeline { tenant_id, pg_version, layers: RwLock::new(LayerMap::default()), + wanted_image_layers: Mutex::new(None), walredo_mgr, walreceiver, @@ -2904,6 +2915,30 @@ impl Timeline { let layers = self.layers.read().unwrap(); let mut max_deltas = 0; + { + let wanted_image_layers = self.wanted_image_layers.lock().unwrap(); + if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers { + let img_range = + partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; + if wanted.overlaps(&img_range) { + // + // gc_timeline only pays attention to image layers that are older than the GC cutoff, + // but create_image_layers creates image layers at last-record-lsn. + // So it's possible that gc_timeline wants a new image layer to be created for a key range, + // but the range is already covered by image layers at more recent LSNs. Before we + // create a new image layer, check if the range is already covered at more recent LSNs. + if !layers + .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))? + { + debug!( + "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})", + img_range.start, img_range.end, cutoff_lsn, lsn + ); + return Ok(true); + } + } + } + } for part_range in &partition.ranges { let image_coverage = layers.image_coverage(part_range, lsn)?; @@ -3023,6 +3058,12 @@ impl Timeline { image_layers.push(image_layer); } } + // All layers that the GC wanted us to create have now been created. + // + // It's possible that another GC cycle happened while we were compacting, and added + // something new to wanted_image_layers, and we now clear that before processing it. + // That's OK, because the next GC iteration will put it back in. + *self.wanted_image_layers.lock().unwrap() = None; // Sync the new layer to disk before adding it to the layer map, to make sure // we don't garbage collect something based on the new layer, before it has @@ -3720,6 +3761,7 @@ impl Timeline { } let mut layers_to_remove = Vec::new(); + let mut wanted_image_layers = KeySpaceRandomAccum::default(); // Scan all layers in the timeline (remote or on-disk). // @@ -3803,6 +3845,15 @@ impl Timeline { "keeping {} because it is the latest layer", l.filename().file_name() ); + // Collect delta key ranges that need image layers to allow garbage + // collecting the layers. + // It is not so obvious whether we need to propagate information only about + // delta layers. Image layers can form "stairs" preventing old image from been deleted. + // But image layers are in any case less sparse than delta layers. Also we need some + // protection from replacing recent image layers with new one after each GC iteration. + if l.is_incremental() && !LayerMap::is_l0(&*l) { + wanted_image_layers.add_range(l.get_key_range()); + } result.layers_not_updated += 1; continue 'outer; } @@ -3815,6 +3866,10 @@ impl Timeline { ); layers_to_remove.push(Arc::clone(&l)); } + self.wanted_image_layers + .lock() + .unwrap() + .replace((new_gc_cutoff, wanted_image_layers.to_keyspace())); let mut updates = layers.batch_update(); if !layers_to_remove.is_empty() { diff --git a/test_runner/performance/test_gc_feedback.py b/test_runner/performance/test_gc_feedback.py new file mode 100644 index 0000000000..f93b560d8e --- /dev/null +++ b/test_runner/performance/test_gc_feedback.py @@ -0,0 +1,76 @@ +import pytest +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder + + +@pytest.mark.timeout(10000) +def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker): + """ + Test that GC is able to collect all old layers even if them are forming + "stairs" and there are not three delta layers since last image layer. + + Information about image layers needed to collect old layers should + be propagated by GC to compaction task which should take in in account + when make a decision which new image layers needs to be created. + """ + env = neon_env_builder.init_start() + client = env.pageserver.http_client() + + tenant_id, _ = env.neon_cli.create_tenant( + conf={ + # disable default GC and compaction + "gc_period": "1000 m", + "compaction_period": "0 s", + "gc_horizon": f"{1024 ** 2}", + "checkpoint_distance": f"{1024 ** 2}", + "compaction_target_size": f"{1024 ** 2}", + # set PITR interval to be small, so we can do GC + "pitr_interval": "10 s", + # "compaction_threshold": "3", + # "image_creation_threshold": "2", + } + ) + endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) + timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0] + n_steps = 10 + n_update_iters = 100 + step_size = 10000 + with endpoint.cursor() as cur: + cur.execute("SET statement_timeout='1000s'") + cur.execute( + "CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)" + ) + cur.execute("CREATE INDEX ON t(step)") + # In each step, we insert 'step_size' new rows, and update the newly inserted rows + # 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table, + # without modifying the earlier parts of the table. + for step in range(n_steps): + cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})") + for i in range(n_update_iters): + cur.execute(f"UPDATE t set count=count+1 where step = {step}") + cur.execute("vacuum t") + + # cur.execute("select pg_table_size('t')") + # logical_size = cur.fetchone()[0] + logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"] + log.info(f"Logical storage size {logical_size}") + + client.timeline_checkpoint(tenant_id, timeline_id) + + # Do compaction and GC + client.timeline_gc(tenant_id, timeline_id, 0) + client.timeline_compact(tenant_id, timeline_id) + # One more iteration to check that no excessive image layers are generated + client.timeline_gc(tenant_id, timeline_id, 0) + client.timeline_compact(tenant_id, timeline_id) + + physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"] + log.info(f"Physical storage size {physical_size}") + + MB = 1024 * 1024 + zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER) + zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER) + zenbenchmark.record( + "physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER + ) From c200ebc09617569f483ce0fccf7646b7267268d8 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 22 May 2023 19:17:08 +0400 Subject: [PATCH 7/8] proxy: log endpoint name everywhere. Checking out proxy logs for the endpoint is a frequent (often first) operation during user issues investigation; let's remove endpoint id -> session id mapping annoying extra step here. --- proxy/src/auth/backend.rs | 10 ++++++++++ proxy/src/proxy.rs | 3 +++ 2 files changed, 13 insertions(+) diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index 18bc80d523..9322e4f9ff 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -139,6 +139,16 @@ async fn auth_quirks( } impl BackendType<'_, ClientCredentials<'_>> { + /// Get compute endpoint name from the credentials. + pub fn get_endpoint(&self) -> Option { + use BackendType::*; + + match self { + Console(_, creds) => creds.project.clone(), + Postgres(_, creds) => creds.project.clone(), + Link(_) => Some("link".to_owned()), + } + } /// Authenticate the client via the requested backend, possibly using credentials. #[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)] pub async fn authenticate( diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index f3d3524d30..cf2dd000db 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -455,6 +455,9 @@ impl<'a, S> Client<'a, S> { impl Client<'_, S> { /// Let the client authenticate and connect to the designated compute node. + // Instrumentation logs endpoint name everywhere. Doesn't work for link + // auth; strictly speaking we don't know endpoint name in its case. + #[tracing::instrument(name = "", fields(ep = self.creds.get_endpoint().unwrap_or("".to_owned())), skip_all)] async fn connect_to_db( self, session: cancellation::Session<'_>, From f3769d45ae4180e8cf4a127ae0f82ea82dd36d39 Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 24 May 2023 08:15:39 +0300 Subject: [PATCH 8/8] chore: upgrade tokio to 1.28.1 (#4294) no major changes, but this is the most recent LTS release and will be required by #4292. --- Cargo.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d63ebd99d..2223453a08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4271,9 +4271,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.27.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" dependencies = [ "autocfg", "bytes", @@ -4284,7 +4284,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.4.9", "tokio-macros", - "windows-sys 0.45.0", + "windows-sys 0.48.0", ] [[package]] @@ -4299,9 +4299,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote",