Compare commits

..

9 Commits

Author SHA1 Message Date
Arseny Sher
32d4e4914a Add wait events without query to metric. 2023-11-16 23:56:04 +01:00
Arseny Sher
d4d577e7ff Add query to pg_wait_sampling metric 2023-11-16 22:42:08 +01:00
Arseny Sher
f552aa05fa Add pg_wait_sampling metric for vms. 2023-11-16 22:04:29 +01:00
Arthur Petukhovsky
779badb7c5 Join postgres multiline logs 2023-11-16 20:54:02 +00:00
Arseny Sher
e6eb548491 create extension pg_wait_sampling in compute_ctl 2023-11-16 20:54:02 +00:00
Arseny Sher
16e9eb2832 Try to enable a custom postgres_exporter query. 2023-11-16 20:54:02 +00:00
Arseny Sher
042686183b Add pg_wait_sampling extension. 2023-11-16 20:54:02 +00:00
khanova
0c243faf96 Proxy log pid hack (#5869)
## Problem

Improve observability for the compute node.

## Summary of changes

Log pid from the compute node. Doesn't work with pgbouncer.
2023-11-16 20:46:23 +00:00
Em Sharnoff
d0a842a509 Update vm-builder to v0.19.0 and move its customization here (#5783)
ref neondatabase/autoscaling#600 for more
2023-11-16 18:17:42 +01:00
10 changed files with 85 additions and 54 deletions

10
Cargo.lock generated
View File

@@ -3221,7 +3221,7 @@ dependencies = [
[[package]] [[package]]
name = "postgres" name = "postgres"
version = "0.19.4" version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
dependencies = [ dependencies = [
"bytes", "bytes",
"fallible-iterator", "fallible-iterator",
@@ -3234,7 +3234,7 @@ dependencies = [
[[package]] [[package]]
name = "postgres-native-tls" name = "postgres-native-tls"
version = "0.5.0" version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
dependencies = [ dependencies = [
"native-tls", "native-tls",
"tokio", "tokio",
@@ -3245,7 +3245,7 @@ dependencies = [
[[package]] [[package]]
name = "postgres-protocol" name = "postgres-protocol"
version = "0.6.4" version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
dependencies = [ dependencies = [
"base64 0.20.0", "base64 0.20.0",
"byteorder", "byteorder",
@@ -3263,7 +3263,7 @@ dependencies = [
[[package]] [[package]]
name = "postgres-types" name = "postgres-types"
version = "0.2.4" version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
dependencies = [ dependencies = [
"bytes", "bytes",
"fallible-iterator", "fallible-iterator",
@@ -4933,7 +4933,7 @@ dependencies = [
[[package]] [[package]]
name = "tokio-postgres" name = "tokio-postgres"
version = "0.7.7" version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048" source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"byteorder", "byteorder",

View File

@@ -165,11 +165,11 @@ env_logger = "0.10"
log = "0.4" log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed ## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
## Other git libraries ## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -206,7 +206,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests. # This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead. # TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" } tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
################# Binary contents sections ################# Binary contents sections

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::env; use std::env;
use std::fs; use std::fs;
use std::io::BufRead; use std::io::BufRead;
use std::io::Write;
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::Path; use std::path::Path;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
@@ -14,6 +15,7 @@ use chrono::{DateTime, Utc};
use futures::future::join_all; use futures::future::join_all;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use notify::event;
use postgres::{Client, NoTls}; use postgres::{Client, NoTls};
use tokio; use tokio;
use tokio_postgres; use tokio_postgres;
@@ -644,9 +646,30 @@ impl ComputeNode {
} else { } else {
vec![] vec![]
}) })
.stderr(Stdio::piped())
.spawn() .spawn()
.expect("cannot start postgres process"); .expect("cannot start postgres process");
let stderr = pg.stderr.take().unwrap();
std::thread::spawn(move || {
let reader = std::io::BufReader::new(stderr);
let mut last_lines = vec![];
for line in reader.lines() {
if let Ok(line) = line {
if line.starts_with("2023-") {
// print all lines from the previous postgres instance
let combined = format!("PG:{}\n", last_lines.join("\u{200B}"));
let res = std::io::stderr().lock().write_all(combined.as_bytes());
if let Err(e) = res {
error!("failed to write to stderr: {}", e);
}
last_lines.clear();
}
last_lines.push(line);
}
}
});
wait_for_postgres(&mut pg, pgdata_path)?; wait_for_postgres(&mut pg, pgdata_path)?;
Ok(pg) Ok(pg)

View File

@@ -670,6 +670,12 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
info!("creating system extensions with query: {}", query); info!("creating system extensions with query: {}", query);
client.simple_query(query)?; client.simple_query(query)?;
} }
if libs.contains("pg_wait_sampling") {
// Create extension only if this compute really needs it
let query = "CREATE EXTENSION IF NOT EXISTS pg_wait_sampling";
info!("creating system extensions with query: {}", query);
client.simple_query(query)?;
}
} }
Ok(()) Ok(())

View File

@@ -248,6 +248,7 @@ impl ConnCfg {
// connect_raw() will not use TLS if sslmode is "disable" // connect_raw() will not use TLS if sslmode is "disable"
let (client, connection) = self.0.connect_raw(stream, tls).await?; let (client, connection) = self.0.connect_raw(stream, tls).await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner(); let stream = connection.stream.into_inner();
info!( info!(

View File

@@ -514,7 +514,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
} }
/// Try to connect to the compute node once. /// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)] #[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute_once( async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo, node_info: &console::CachedNodeInfo,
timeout: time::Duration, timeout: time::Duration,

View File

@@ -208,6 +208,10 @@ impl GlobalConnPool {
} else { } else {
info!("pool: reusing connection '{conn_info}'"); info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?; client.session.send(session_id)?;
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
latency_timer.pool_hit(); latency_timer.pool_hit();
latency_timer.success(); latency_timer.success();
return Ok(Client::new(client, pool).await); return Ok(Client::new(client, pool).await);
@@ -224,6 +228,12 @@ impl GlobalConnPool {
) )
.await .await
}; };
if let Ok(client) = &new_client {
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
}
match &new_client { match &new_client {
// clear the hash. it's no longer valid // clear the hash. it's no longer valid
@@ -257,12 +267,11 @@ impl GlobalConnPool {
} }
_ => {} _ => {}
} }
let new_client = new_client?;
// new_client.map(|inner| Client::new(inner, pool).await) Ok(Client::new(new_client, pool).await)
Ok(Client::new(new_client?, pool).await)
} }
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> { fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
let conn_id = client.conn_id; let conn_id = client.conn_id;
// We want to hold this open while we return. This ensures that the pool can't close // We want to hold this open while we return. This ensures that the pool can't close
@@ -306,9 +315,9 @@ impl GlobalConnPool {
// do logging outside of the mutex // do logging outside of the mutex
if returned { if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}"); info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else { } else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}"); info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
} }
Ok(()) Ok(())
@@ -385,7 +394,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
// Wake up the destination if needed. Code here is a bit involved because // Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures // we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects. // that this code expects.
#[tracing::instrument(skip_all)] #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute( async fn connect_to_compute(
config: &config::ProxyConfig, config: &config::ProxyConfig,
conn_info: &ConnInfo, conn_info: &ConnInfo,
@@ -452,6 +461,7 @@ async fn connect_to_compute_once(
.connect_timeout(timeout) .connect_timeout(timeout)
.connect(tokio_postgres::NoTls) .connect(tokio_postgres::NoTls)
.await?; .await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let (tx, mut rx) = tokio::sync::watch::channel(session); let (tx, mut rx) = tokio::sync::watch::channel(session);
@@ -519,22 +529,6 @@ struct ClientInner {
conn_id: uuid::Uuid, conn_id: uuid::Uuid,
} }
impl ClientInner {
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
if rows.len() != 1 {
Err(anyhow::anyhow!(
"expected 1 row from pg_backend_pid(), got {}",
rows.len()
))
} else {
let pid = rows[0].get(0);
info!(%pid, "got pid");
Ok(pid)
}
}
}
impl Client { impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> { pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone()) USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
@@ -546,7 +540,6 @@ pub struct Client {
span: Span, span: Span,
inner: Option<ClientInner>, inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>, pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
pid: i32,
} }
pub struct Discard<'a> { pub struct Discard<'a> {
@@ -556,12 +549,11 @@ pub struct Discard<'a> {
impl Client { impl Client {
pub(self) async fn new( pub(self) async fn new(
mut inner: ClientInner, inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>, pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
) -> Self { ) -> Self {
Self { Self {
conn_id: inner.conn_id, conn_id: inner.conn_id,
pid: inner.get_pid().await.unwrap_or(-1),
inner: Some(inner), inner: Some(inner),
span: Span::current(), span: Span::current(),
pool, pool,
@@ -573,7 +565,6 @@ impl Client {
pool, pool,
conn_id, conn_id,
span: _, span: _,
pid: _,
} = self; } = self;
( (
&mut inner &mut inner
@@ -630,11 +621,10 @@ impl Drop for Client {
.expect("client inner should not be removed"); .expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() { if let Some((conn_info, conn_pool)) = self.pool.take() {
let current_span = self.span.clone(); let current_span = self.span.clone();
let pid = self.pid;
// return connection to the pool // return connection to the pool
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let _span = current_span.enter(); let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client, pid); let _ = conn_pool.put(&conn_info, client);
}); });
} }
} }

View File

@@ -250,7 +250,7 @@ pub async fn handle(
Ok(response) Ok(response)
} }
#[instrument(name = "sql-over-http", skip_all)] #[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
async fn handle_inner( async fn handle_inner(
request: Request<Body>, request: Request<Body>,
sni_hostname: Option<String>, sni_hostname: Option<String>,

View File

@@ -6,12 +6,14 @@ from fixtures.pageserver.http import PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient): def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
tenant_id, timeline_id = env.neon_cli.create_tenant() tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id) endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=[
"log_statement=all",
],)
# we rely upon autocommit after each statement # we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many( res_1 = endpoint.safe_psql_many(
queries=[ queries=[
"CREATE TABLE t(key int primary key, value text)", "CREATE TABLE \nt(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'", "INSERT INTO \n\nt SELECT generate_series(1,100000), 'payload'",
"SELECT sum(key) FROM t", "SELECT sum(key) FROM t",
] ]
) )

View File

@@ -48,19 +48,28 @@ files:
} }
- filename: postgres_exporter_queries.yml - filename: postgres_exporter_queries.yml
content: | content: |
postgres_exporter_pg_database_size: pg_wait_sampling:
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database" query: "select pid, event_type, event, w.queryid as queryid, query, count from pg_wait_sampling_profile w left join pg_stat_statements s on w.queryid = s.queryid;"
cache_seconds: 30 cache_seconds: 30
metrics: metrics:
- datname: - pid:
usage: "LABEL" usage: "LABEL"
description: "Name of the database" description: "backend pid"
- bytes: - event_type:
usage: "LABEL"
description: "event type"
- event:
usage: "LABEL"
description: "event"
- queryid:
usage: "LABEL"
description: "queryid"
- query:
usage: "LABEL"
description: "query"
- count:
usage: "GAUGE" usage: "GAUGE"
description: "Disk space used by the database" description: "count"
- fourtytwo:
usage: "GAUGE"
description: "fourtytwo"
build: | build: |
# Build cgroup-tools # Build cgroup-tools
# #