pg_sni_router: add session_id to more messages (#4403)

See superceded #4390.

- capture log in test
- expand the span to cover init and error reporting
- remove obvious logging by logging only unexpected
This commit is contained in:
Joonas Koivunen
2023-06-02 14:59:10 +03:00
committed by GitHub
parent 66cdba990a
commit ef80a902c8
2 changed files with 20 additions and 11 deletions

View File

@@ -17,7 +17,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use utils::{project_git_version, sentry_init::init_sentry};
use tracing::{error, info, warn};
use tracing::{error, info, warn, Instrument};
project_git_version!(GIT_VERSION);
@@ -141,7 +141,6 @@ async fn task_main(
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
info!("accepted postgres client connection from {peer_addr}");
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
@@ -149,18 +148,18 @@ async fn task_main(
connections.spawn(
async move {
info!("spawned a task for {peer_addr}");
socket
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(dest_suffix, tls_config, session_id, socket).await
info!(%peer_addr, "serving");
handle_client(dest_suffix, tls_config, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
}),
})
.instrument(tracing::info_span!("handle_client", ?session_id))
);
}
_ = cancellation_token.cancelled() => {
@@ -192,7 +191,6 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
let mut stream = PqStream::new(Stream::from_raw(raw_stream));
let msg = stream.read_startup_packet().await?;
info!("received {msg:?}");
use pq_proto::FeStartupPacket::*;
match msg {
@@ -215,15 +213,19 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
Ok(raw.upgrade(tls_config).await?)
}
_ => stream.throw_error_str(ERR_INSECURE_CONNECTION).await?,
unexpected => {
info!(
?unexpected,
"unexpected startup packet, rejecting connection"
);
stream.throw_error_str(ERR_INSECURE_CONNECTION).await?
}
}
}
#[tracing::instrument(fields(session_id = ?session_id), skip_all)]
async fn handle_client(
dest_suffix: Arc<String>,
tls_config: Arc<rustls::ServerConfig>,
session_id: uuid::Uuid,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> anyhow::Result<()> {
let tls_stream = ssl_handshake(stream, tls_config).await?;

View File

@@ -37,6 +37,7 @@ class PgSniRouter(PgProtocol):
destination: str,
tls_cert: Path,
tls_key: Path,
test_output_dir: Path,
):
# Must use a hostname rather than IP here, for SNI to work
host = "localhost"
@@ -49,6 +50,7 @@ class PgSniRouter(PgProtocol):
self.tls_cert = tls_cert
self.tls_key = tls_key
self._popen: Optional[subprocess.Popen[bytes]] = None
self.test_output_dir = test_output_dir
def start(self) -> "PgSniRouter":
assert self._popen is None
@@ -60,8 +62,12 @@ class PgSniRouter(PgProtocol):
*["--destination", self.destination],
]
self._popen = subprocess.Popen(args)
router_log_path = self.test_output_dir / "pg_sni_router.log"
router_log = open(router_log_path, "w")
self._popen = subprocess.Popen(args, stderr=router_log)
self._wait_until_ready()
log.info(f"pg_sni_router started, log file: {router_log_path}")
return self
@backoff.on_exception(backoff.expo, OSError, max_time=10)
@@ -121,6 +127,7 @@ def test_pg_sni_router(
destination="localtest.me",
tls_cert=test_output_dir / "router.crt",
tls_key=test_output_dir / "router.key",
test_output_dir=test_output_dir,
) as router:
router.start()