diff --git a/libs/postgres_backend/src/lib.rs b/libs/postgres_backend/src/lib.rs index 600f1d728c..8ea4b93fb1 100644 --- a/libs/postgres_backend/src/lib.rs +++ b/libs/postgres_backend/src/lib.rs @@ -81,17 +81,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool { ) } -#[async_trait::async_trait] pub trait Handler { /// Handle single query. /// postgres_backend will issue ReadyForQuery after calling this (this /// might be not what we want after CopyData streaming, but currently we don't /// care). It will also flush out the output buffer. - async fn process_query( + fn process_query( &mut self, pgb: &mut PostgresBackend, query_string: &str, - ) -> Result<(), QueryError>; + ) -> impl Future>; /// Called on startup packet receival, allows to process params. /// diff --git a/libs/postgres_backend/tests/simple_select.rs b/libs/postgres_backend/tests/simple_select.rs index 7ec85f0dbe..900083ea7f 100644 --- a/libs/postgres_backend/tests/simple_select.rs +++ b/libs/postgres_backend/tests/simple_select.rs @@ -23,7 +23,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) { struct TestHandler {} -#[async_trait::async_trait] impl Handler for TestHandler { // return single col 'hey' for any query async fn process_query( diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 39c6a6fb74..9261b7481d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1199,7 +1199,6 @@ impl PageServerHandler { } } -#[async_trait::async_trait] impl postgres_backend::Handler for PageServerHandler where IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, diff --git a/proxy/src/console/mgmt.rs b/proxy/src/console/mgmt.rs index 2ed4f5f206..ee5f83ee76 100644 --- a/proxy/src/console/mgmt.rs +++ b/proxy/src/console/mgmt.rs @@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo; // TODO: replace with an http-based protocol. struct MgmtHandler; -#[async_trait::async_trait] + impl postgres_backend::Handler for MgmtHandler { async fn process_query( &mut self, diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 2c519433ef..3f00b69cde 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -2,6 +2,7 @@ //! protocol commands. use anyhow::Context; +use std::future::Future; use std::str::{self, FromStr}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; @@ -95,7 +96,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str { } } -#[async_trait::async_trait] impl postgres_backend::Handler for SafekeeperPostgresHandler { @@ -197,49 +197,51 @@ impl postgres_backend::Handler Ok(()) } - async fn process_query( + fn process_query( &mut self, pgb: &mut PostgresBackend, query_string: &str, - ) -> Result<(), QueryError> { - if query_string - .to_ascii_lowercase() - .starts_with("set datestyle to ") - { - // important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect - pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; - return Ok(()); - } - - let cmd = parse_cmd(query_string)?; - let cmd_str = cmd_to_string(&cmd); - - let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard(); - - info!("got query {:?}", query_string); - - let tenant_id = self.tenant_id.context("tenantid is required")?; - let timeline_id = self.timeline_id.context("timelineid is required")?; - self.check_permission(Some(tenant_id))?; - self.ttid = TenantTimelineId::new(tenant_id, timeline_id); - - match cmd { - SafekeeperPostgresCommand::StartWalPush => { - self.handle_start_wal_push(pgb) - .instrument(info_span!("WAL receiver")) - .await + ) -> impl Future> { + Box::pin(async move { + if query_string + .to_ascii_lowercase() + .starts_with("set datestyle to ") + { + // important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + return Ok(()); } - SafekeeperPostgresCommand::StartReplication { start_lsn, term } => { - self.handle_start_replication(pgb, start_lsn, term) - .instrument(info_span!("WAL sender")) - .await + + let cmd = parse_cmd(query_string)?; + let cmd_str = cmd_to_string(&cmd); + + let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard(); + + info!("got query {:?}", query_string); + + let tenant_id = self.tenant_id.context("tenantid is required")?; + let timeline_id = self.timeline_id.context("timelineid is required")?; + self.check_permission(Some(tenant_id))?; + self.ttid = TenantTimelineId::new(tenant_id, timeline_id); + + match cmd { + SafekeeperPostgresCommand::StartWalPush => { + self.handle_start_wal_push(pgb) + .instrument(info_span!("WAL receiver")) + .await + } + SafekeeperPostgresCommand::StartReplication { start_lsn, term } => { + self.handle_start_replication(pgb, start_lsn, term) + .instrument(info_span!("WAL sender")) + .await + } + SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await, + SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await, + SafekeeperPostgresCommand::JSONCtrl { ref cmd } => { + handle_json_ctrl(self, pgb, cmd).await + } } - SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await, - SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await, - SafekeeperPostgresCommand::JSONCtrl { ref cmd } => { - handle_json_ctrl(self, pgb, cmd).await - } - } + }) } }