Compare commits

..

6 Commits

Author SHA1 Message Date
Arseny Sher
1bf5e07da1 Try to enable a custom postgres_exporter query. 2023-11-16 19:35:04 +01:00
Arseny Sher
87de91b004 Add pg_wait_sampling extension. 2023-11-16 19:34:48 +01:00
Arthur Petukhovsky
2f217f9ebd Print pid to the logs 2023-11-16 19:34:32 +01:00
Anna Khanova
71491dd467 Fix build 2023-11-16 19:34:16 +01:00
Anna Khanova
67e791c4ec Fmt 2023-11-16 19:34:02 +01:00
Anna Khanova
517782ab94 Log pid in proxy 2023-11-16 19:33:54 +01:00
10 changed files with 54 additions and 85 deletions

10
Cargo.lock generated
View File

@@ -3221,7 +3221,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3234,7 +3234,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"native-tls",
"tokio",
@@ -3245,7 +3245,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3263,7 +3263,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4933,7 +4933,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=6ce32f791526e27533cab0232a6bb243b2c32584#6ce32f791526e27533cab0232a6bb243b2c32584"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
dependencies = [
"async-trait",
"byteorder",

View File

@@ -165,11 +165,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -206,7 +206,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="6ce32f791526e27533cab0232a6bb243b2c32584" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
################# Binary contents sections

View File

@@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::BufRead;
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
@@ -15,7 +14,6 @@ use chrono::{DateTime, Utc};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use notify::event;
use postgres::{Client, NoTls};
use tokio;
use tokio_postgres;
@@ -646,30 +644,9 @@ impl ComputeNode {
} else {
vec![]
})
.stderr(Stdio::piped())
.spawn()
.expect("cannot start postgres process");
let stderr = pg.stderr.take().unwrap();
std::thread::spawn(move || {
let reader = std::io::BufReader::new(stderr);
let mut last_lines = vec![];
for line in reader.lines() {
if let Ok(line) = line {
if line.starts_with("2023-") {
// print all lines from the previous postgres instance
let combined = format!("PG:{}\n", last_lines.join("\u{200B}"));
let res = std::io::stderr().lock().write_all(combined.as_bytes());
if let Err(e) = res {
error!("failed to write to stderr: {}", e);
}
last_lines.clear();
}
last_lines.push(line);
}
}
});
wait_for_postgres(&mut pg, pgdata_path)?;
Ok(pg)

View File

@@ -670,12 +670,6 @@ pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()>
info!("creating system extensions with query: {}", query);
client.simple_query(query)?;
}
if libs.contains("pg_wait_sampling") {
// Create extension only if this compute really needs it
let query = "CREATE EXTENSION IF NOT EXISTS pg_wait_sampling";
info!("creating system extensions with query: {}", query);
client.simple_query(query)?;
}
}
Ok(())

View File

@@ -248,7 +248,6 @@ impl ConnCfg {
// connect_raw() will not use TLS if sslmode is "disable"
let (client, connection) = self.0.connect_raw(stream, tls).await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner();
info!(

View File

@@ -514,7 +514,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
#[tracing::instrument(name = "connect_once", skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
timeout: time::Duration,

View File

@@ -208,10 +208,6 @@ impl GlobalConnPool {
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client::new(client, pool).await);
@@ -228,12 +224,6 @@ impl GlobalConnPool {
)
.await
};
if let Ok(client) = &new_client {
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
}
match &new_client {
// clear the hash. it's no longer valid
@@ -267,11 +257,12 @@ impl GlobalConnPool {
}
_ => {}
}
let new_client = new_client?;
Ok(Client::new(new_client, pool).await)
// new_client.map(|inner| Client::new(inner, pool).await)
Ok(Client::new(new_client?, pool).await)
}
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> {
let conn_id = client.conn_id;
// We want to hold this open while we return. This ensures that the pool can't close
@@ -315,9 +306,9 @@ impl GlobalConnPool {
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}");
}
Ok(())
@@ -394,7 +385,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
#[tracing::instrument(skip_all)]
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
@@ -461,7 +452,6 @@ async fn connect_to_compute_once(
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let (tx, mut rx) = tokio::sync::watch::channel(session);
@@ -529,6 +519,22 @@ struct ClientInner {
conn_id: uuid::Uuid,
}
impl ClientInner {
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
if rows.len() != 1 {
Err(anyhow::anyhow!(
"expected 1 row from pg_backend_pid(), got {}",
rows.len()
))
} else {
let pid = rows[0].get(0);
info!(%pid, "got pid");
Ok(pid)
}
}
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
@@ -540,6 +546,7 @@ pub struct Client {
span: Span,
inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
pid: i32,
}
pub struct Discard<'a> {
@@ -549,11 +556,12 @@ pub struct Discard<'a> {
impl Client {
pub(self) async fn new(
inner: ClientInner,
mut inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
) -> Self {
Self {
conn_id: inner.conn_id,
pid: inner.get_pid().await.unwrap_or(-1),
inner: Some(inner),
span: Span::current(),
pool,
@@ -565,6 +573,7 @@ impl Client {
pool,
conn_id,
span: _,
pid: _,
} = self;
(
&mut inner
@@ -621,10 +630,11 @@ impl Drop for Client {
.expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() {
let current_span = self.span.clone();
let pid = self.pid;
// return connection to the pool
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client);
let _ = conn_pool.put(&conn_info, client, pid);
});
}
}

View File

@@ -250,7 +250,7 @@ pub async fn handle(
Ok(response)
}
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
#[instrument(name = "sql-over-http", skip_all)]
async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,

View File

@@ -6,14 +6,12 @@ from fixtures.pageserver.http import PageserverHttpClient
def check_tenant(env: NeonEnv, pageserver_http: PageserverHttpClient):
tenant_id, timeline_id = env.neon_cli.create_tenant()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id, config_lines=[
"log_statement=all",
],)
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
# we rely upon autocommit after each statement
res_1 = endpoint.safe_psql_many(
queries=[
"CREATE TABLE \nt(key int primary key, value text)",
"INSERT INTO \n\nt SELECT generate_series(1,100000), 'payload'",
"CREATE TABLE t(key int primary key, value text)",
"INSERT INTO t SELECT generate_series(1,100000), 'payload'",
"SELECT sum(key) FROM t",
]
)

View File

@@ -48,28 +48,19 @@ files:
}
- filename: postgres_exporter_queries.yml
content: |
pg_wait_sampling:
query: "select pid, event_type, event, w.queryid as queryid, query, count from pg_wait_sampling_profile w left join pg_stat_statements s on w.queryid = s.queryid;"
postgres_exporter_pg_database_size:
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database"
cache_seconds: 30
metrics:
- pid:
- datname:
usage: "LABEL"
description: "backend pid"
- event_type:
usage: "LABEL"
description: "event type"
- event:
usage: "LABEL"
description: "event"
- queryid:
usage: "LABEL"
description: "queryid"
- query:
usage: "LABEL"
description: "query"
- count:
description: "Name of the database"
- bytes:
usage: "GAUGE"
description: "count"
description: "Disk space used by the database"
- fourtytwo:
usage: "GAUGE"
description: "fourtytwo"
build: |
# Build cgroup-tools
#