Compare commits

...

6 Commits

Author SHA1 Message Date
Arpad Müller
548b28b28b WIP remove generic argument from safekeeper 2024-04-28 01:32:12 +02:00
Arpad Müller
f9039f7d73 Remove async_trait again 2024-04-28 01:15:07 +02:00
Arpad Müller
2a7bc782fd Remove generic IO argument 2024-04-28 01:11:59 +02:00
Arpad Müller
307d10b111 Add HandlerSync trait 2024-04-28 01:01:21 +02:00
Arpad Müller
5546c0de35 Same for SafekeeperPostgresHandler 2024-04-28 00:36:45 +02:00
Arpad Müller
d1fb4a4a00 Move PageServerHandler process_query content 2024-04-28 00:30:46 +02:00
4 changed files with 41 additions and 10 deletions

View File

@@ -78,18 +78,18 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
pub trait Handler<IO>: HandlerSync<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError>;
) -> impl Future<Output = Result<(), QueryError>> + Send;
}
pub trait HandlerSync<IO> {
/// Called on startup packet receival, allows to process params.
///
/// If Ok(false) is returned postgres_backend will skip auth -- that is needed for new users

View File

@@ -1368,8 +1368,7 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
impl<IO> postgres_backend::HandlerSync<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
@@ -1409,9 +1408,24 @@ where
) -> Result<(), QueryError> {
Ok(())
}
}
type IO<'s> = std::pin::Pin<&'s mut tokio_io_timeout::TimeoutReader<tokio::net::TcpStream>>;
impl<'s> postgres_backend::Handler<IO<'s>> for PageServerHandler
{
#[instrument(skip_all, fields(tenant_id, timeline_id))]
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO<'s>>,
query_string: &str,
) -> Result<(), QueryError> {
self.process_query_(pgb, &query_string).await
}
}
impl PageServerHandler {
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Sync + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,

View File

@@ -75,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,
@@ -89,6 +88,8 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
}
}
impl postgres_backend::HandlerSync<tokio::net::TcpStream> for MgmtHandler {}
fn try_process_query(pgb: &mut PostgresBackendTCP, query: &str) -> Result<(), QueryError> {
let resp: KickSession = serde_json::from_str(query).context("Failed to parse query as json")?;

View File

@@ -2,10 +2,13 @@
//! protocol commands.
use anyhow::Context;
use std::net::TcpStream;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_io_timeout::TimeoutReader;
use tracing::{debug, info, info_span, Instrument};
use utils::measured_stream::MeasuredStream;
use crate::auth::check_permission;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
@@ -95,8 +98,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::HandlerSync<IO>
for SafekeeperPostgresHandler
{
// tenant_id and timeline_id are passed in connection string params
@@ -191,8 +193,22 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
self.claims = Some(data.claims);
Ok(())
}
}
type IO<'s, R: FnMut(usize), W> =
MeasuredStream<std::pin::Pin<&'s mut TimeoutReader<TcpStream>>, R, W>;
impl<'s, R: FnMut(usize), W> postgres_backend::Handler<IO<'s, R, W>> for SafekeeperPostgresHandler {
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO<'s, R, W>>,
query_string: &str,
) -> Result<(), QueryError> {
self.process_query_(pgb, &query_string).await
}
}
impl SafekeeperPostgresHandler {
async fn process_query_<IO: AsyncRead + AsyncWrite + Send + Unpin>(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,