refactor: Cleanup page service (#4097)

Refactoring part of #4093.

Numerious `Send + Sync` bounds were a distraction, that were not needed
at all. The proper `Bytes` usage and one `"error_message".to_string()`
are just drive-by fixes.

Not using the `PostgresBackendTCP` allows us to start setting read
timeouts (and more). `PostgresBackendTCP` is still used from proxy, so
it cannot be removed.
This commit is contained in:
Joonas Koivunen
2023-04-27 18:51:57 +03:00
committed by GitHub
parent d1e86d65dc
commit fdf5e4db5e
2 changed files with 46 additions and 26 deletions

View File

@@ -114,7 +114,7 @@ async fn import_rel(
path: &Path,
spcoid: Oid,
dboid: Oid,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -200,7 +200,7 @@ async fn import_slru(
modification: &mut DatadirModification<'_>,
slru: SlruKind,
path: &Path,
reader: &mut (impl AsyncRead + Send + Sync + Unpin),
reader: &mut (impl AsyncRead + Unpin),
len: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -612,8 +612,8 @@ async fn import_file(
Ok(None)
}
async fn read_all_bytes(reader: &mut (impl AsyncRead + Send + Sync + Unpin)) -> Result<Bytes> {
async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
let mut buf: Vec<u8> = vec![];
reader.read_to_end(&mut buf).await?;
Ok(Bytes::copy_from_slice(&buf[..]))
Ok(Bytes::from(buf))
}

View File

@@ -20,7 +20,6 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use postgres_backend::PostgresBackendTCP;
use postgres_backend::{self, is_expected_io_error, AuthType, PostgresBackend, QueryError};
use pq_proto::framed::ConnectionError;
use pq_proto::FeStartupPacket;
@@ -32,6 +31,7 @@ use std::str;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tracing::*;
use utils::id::ConnectionId;
@@ -57,7 +57,10 @@ use crate::trace::Tracer;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<Bytes>> + '_ {
fn copyin_stream<IO>(pgb: &mut PostgresBackend<IO>) -> impl Stream<Item = io::Result<Bytes>> + '_
where
IO: AsyncRead + AsyncWrite + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
@@ -65,8 +68,8 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
_ = task_mgr::shutdown_watcher() => {
// We were requested to shut down.
let msg = "pageserver is shutting down".to_string();
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None));
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Other(anyhow::anyhow!(msg)))
}
@@ -125,7 +128,7 @@ fn copyin_stream(pgb: &mut PostgresBackendTCP) -> impl Stream<Item = io::Result<
///
/// XXX: Currently, any trailing data after the EOF marker prints a warning.
/// Perhaps it should be a hard error?
async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
@@ -245,12 +248,14 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let peer_addr = socket.peer_addr().context("get peer address")?;
// XXX: pgbackend.run() should take the connection_ctx,
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(conf, auth, connection_ctx);
let pgbackend = PostgresBackend::new(socket, auth_type, None)?;
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
.run(&mut conn_handler, task_mgr::shutdown_watcher)
@@ -332,13 +337,16 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_pagerequests(
async fn handle_pagerequests<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
ctx: RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
// NOTE: pagerequests handler exits when connection is closed,
// so there is no need to reset the association
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
@@ -436,16 +444,19 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_basebackup(
async fn handle_import_basebackup<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
ctx: RequestContext,
) -> Result<(), QueryError> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
@@ -486,15 +497,18 @@ impl PageServerHandler {
}
#[instrument(skip(self, pgb, ctx))]
async fn handle_import_wal(
async fn handle_import_wal<IO>(
&self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
ctx: RequestContext,
) -> Result<(), QueryError> {
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
let timeline = get_active_tenant_timeline(tenant_id, timeline_id, &ctx).await?;
@@ -690,16 +704,19 @@ impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(self, pgb, ctx))]
async fn handle_basebackup_request(
async fn handle_basebackup_request<IO>(
&mut self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Option<Lsn>,
prev_lsn: Option<Lsn>,
full_backup: bool,
ctx: RequestContext,
) -> anyhow::Result<()> {
) -> anyhow::Result<()>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let started = std::time::Instant::now();
// check that the timeline exists
@@ -770,10 +787,13 @@ impl PageServerHandler {
}
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackendTCP,
_pgb: &mut PostgresBackend<IO>,
jwt_response: &[u8],
) -> Result<(), QueryError> {
// this unwrap is never triggered, because check_auth_jwt only called when auth_type is NeonJWT
@@ -801,7 +821,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
fn startup(
&mut self,
_pgb: &mut PostgresBackendTCP,
_pgb: &mut PostgresBackend<IO>,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
Ok(())
@@ -809,7 +829,7 @@ impl postgres_backend::Handler<tokio::net::TcpStream> for PageServerHandler {
async fn process_query(
&mut self,
pgb: &mut PostgresBackendTCP,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError> {
let ctx = self.connection_ctx.attached_child();