From fdf5e4db5ea15550d2cf26109db504092179958b Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Thu, 27 Apr 2023 18:51:57 +0300 Subject: [PATCH] refactor: Cleanup page service (#4097) Refactoring part of #4093. Numerious `Send + Sync` bounds were a distraction, that were not needed at all. The proper `Bytes` usage and one `"error_message".to_string()` are just drive-by fixes. Not using the `PostgresBackendTCP` allows us to start setting read timeouts (and more). `PostgresBackendTCP` is still used from proxy, so it cannot be removed. --- pageserver/src/import_datadir.rs | 8 ++-- pageserver/src/page_service.rs | 64 +++++++++++++++++++++----------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 39e434a023..936de35eb9 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -114,7 +114,7 @@ async fn import_rel( path: &Path, spcoid: Oid, dboid: Oid, - reader: &mut (impl AsyncRead + Send + Sync + Unpin), + reader: &mut (impl AsyncRead + Unpin), len: usize, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -200,7 +200,7 @@ async fn import_slru( modification: &mut DatadirModification<'_>, slru: SlruKind, path: &Path, - reader: &mut (impl AsyncRead + Send + Sync + Unpin), + reader: &mut (impl AsyncRead + Unpin), len: usize, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -612,8 +612,8 @@ async fn import_file( Ok(None) } -async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result { +async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result { let mut buf: Vec = vec![]; reader.read_to_end(&mut buf).await?; - Ok(Bytes::copy_from_slice(&buf[..])) + Ok(Bytes::from(buf)) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 135f08e846..3610704f2c 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -20,7 +20,6 @@ use pageserver_api::models::{ PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, }; -use postgres_backend::PostgresBackendTCP; use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError}; use pq_proto::framed::ConnectionError; use pq_proto::FeStartupPacket; @@ -32,6 +31,7 @@ use std::str; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::io::StreamReader; use tracing::*; use utils::id::ConnectionId; @@ -57,7 +57,10 @@ use crate::trace::Tracer; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; -fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream> + '_ { +fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream> + '_ +where + IO: AsyncRead + AsyncWrite + Unpin, +{ async_stream::try_stream! { loop { let msg = tokio::select! { @@ -65,8 +68,8 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream { // We were requested to shut down. - let msg = "pageserver is shutting down".to_string(); - let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)); + let msg = "pageserver is shutting down"; + let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None)); Err(QueryError::Other(anyhow::anyhow!(msg))) } @@ -125,7 +128,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream anyhow::Result<()> { +async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> { use tokio::io::AsyncReadExt; let mut buf = [0u8; 512]; @@ -245,12 +248,14 @@ async fn page_service_conn_main( .set_nodelay(true) .context("could not set TCP_NODELAY")?; + let peer_addr = socket.peer_addr().context("get peer address")?; + // XXX: pgbackend.run() should take the connection_ctx, // and create a child per-query context when it invokes process_query. // But it's in a shared crate, so, we store connection_ctx inside PageServerHandler // and create the per-query context in process_query ourselves. let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx); - let pgbackend = PostgresBackend::new(socket, auth_type, None)?; + let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; match pgbackend .run(&mut conn_handler, task_mgr::shutdown_watcher) @@ -332,13 +337,16 @@ impl PageServerHandler { } #[instrument(skip(self, pgb, ctx))] - async fn handle_pagerequests( + async fn handle_pagerequests( &self, - pgb: &mut PostgresBackendTCP, + pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, ctx: RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result<()> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { // NOTE: pagerequests handler exits when connection is closed, // so there is no need to reset the association task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); @@ -436,16 +444,19 @@ impl PageServerHandler { #[allow(clippy::too_many_arguments)] #[instrument(skip(self, pgb, ctx))] - async fn handle_import_basebackup( + async fn handle_import_basebackup( &self, - pgb: &mut PostgresBackendTCP, + pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, base_lsn: Lsn, _end_lsn: Lsn, pg_version: u32, ctx: RequestContext, - ) -> Result<(), QueryError> { + ) -> Result<(), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); @@ -486,15 +497,18 @@ impl PageServerHandler { } #[instrument(skip(self, pgb, ctx))] - async fn handle_import_wal( + async fn handle_import_wal( &self, - pgb: &mut PostgresBackendTCP, + pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, start_lsn: Lsn, end_lsn: Lsn, ctx: RequestContext, - ) -> Result<(), QueryError> { + ) -> Result<(), QueryError> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?; @@ -690,16 +704,19 @@ impl PageServerHandler { #[allow(clippy::too_many_arguments)] #[instrument(skip(self, pgb, ctx))] - async fn handle_basebackup_request( + async fn handle_basebackup_request( &mut self, - pgb: &mut PostgresBackendTCP, + pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, lsn: Option, prev_lsn: Option, full_backup: bool, ctx: RequestContext, - ) -> anyhow::Result<()> { + ) -> anyhow::Result<()> + where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, + { let started = std::time::Instant::now(); // check that the timeline exists @@ -770,10 +787,13 @@ impl PageServerHandler { } #[async_trait::async_trait] -impl postgres_backend::Handler for PageServerHandler { +impl postgres_backend::Handler for PageServerHandler +where + IO: AsyncRead + AsyncWrite + Send + Sync + Unpin, +{ fn check_auth_jwt( &mut self, - _pgb: &mut PostgresBackendTCP, + _pgb: &mut PostgresBackend, jwt_response: &[u8], ) -> Result<(), QueryError> { // this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT @@ -801,7 +821,7 @@ impl postgres_backend::Handler for PageServerHandler { fn startup( &mut self, - _pgb: &mut PostgresBackendTCP, + _pgb: &mut PostgresBackend, _sm: &FeStartupPacket, ) -> Result<(), QueryError> { Ok(()) @@ -809,7 +829,7 @@ impl postgres_backend::Handler for PageServerHandler { async fn process_query( &mut self, - pgb: &mut PostgresBackendTCP, + pgb: &mut PostgresBackend, query_string: &str, ) -> Result<(), QueryError> { let ctx = self.connection_ctx.attached_child();