From 51f672b0bb97f31b8e5cdac82dea6bcd8ae57566 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 1 Mar 2023 20:05:56 +0400 Subject: [PATCH] Rename write_message to write_message_noflush in postgres_backend_async.rs To make it unifrom across the project; proxy stream.rs and older postgres_backend uses write_message_noflush. --- libs/utils/src/postgres_backend_async.rs | 48 ++++++++++++------------ pageserver/src/page_service.rs | 46 +++++++++++------------ 2 files changed, 48 insertions(+), 46 deletions(-) diff --git a/libs/utils/src/postgres_backend_async.rs b/libs/utils/src/postgres_backend_async.rs index b804c54709..442b06ed01 100644 --- a/libs/utils/src/postgres_backend_async.rs +++ b/libs/utils/src/postgres_backend_async.rs @@ -233,7 +233,7 @@ impl PostgresBackend { } /// Write message into internal output buffer. - pub fn write_message(&mut self, message: &BeMessage<'_>) -> io::Result<&mut Self> { + pub fn write_message_noflush(&mut self, message: &BeMessage<'_>) -> io::Result<&mut Self> { BeMessage::write(&mut self.buf_out, message)?; Ok(self) } @@ -383,7 +383,7 @@ impl PostgresBackend { FeStartupPacket::SslRequest => { debug!("SSL requested"); - self.write_message(&BeMessage::EncryptionResponse(have_tls))?; + self.write_message_noflush(&BeMessage::EncryptionResponse(have_tls))?; if have_tls { self.start_tls().await?; self.state = ProtoState::Encrypted; @@ -391,11 +391,11 @@ impl PostgresBackend { } FeStartupPacket::GssEncRequest => { debug!("GSS requested"); - self.write_message(&BeMessage::EncryptionResponse(false))?; + self.write_message_noflush(&BeMessage::EncryptionResponse(false))?; } FeStartupPacket::StartupMessage { .. } => { if have_tls && !matches!(self.state, ProtoState::Encrypted) { - self.write_message(&BeMessage::ErrorResponse( + self.write_message_noflush(&BeMessage::ErrorResponse( "must connect with TLS", None, ))?; @@ -410,15 +410,17 @@ impl PostgresBackend { match self.auth_type { AuthType::Trust => { - self.write_message(&BeMessage::AuthenticationOk)? - .write_message(&BeMessage::CLIENT_ENCODING)? + self.write_message_noflush(&BeMessage::AuthenticationOk)? + .write_message_noflush(&BeMessage::CLIENT_ENCODING)? // The async python driver requires a valid server_version - .write_message(&BeMessage::server_version("14.1"))? - .write_message(&BeMessage::ReadyForQuery)?; + .write_message_noflush(&BeMessage::server_version("14.1"))? + .write_message_noflush(&BeMessage::ReadyForQuery)?; self.state = ProtoState::Established; } AuthType::NeonJWT => { - self.write_message(&BeMessage::AuthenticationCleartextPassword)?; + self.write_message_noflush( + &BeMessage::AuthenticationCleartextPassword, + )?; self.state = ProtoState::Authentication; } } @@ -441,7 +443,7 @@ impl PostgresBackend { let (_, jwt_response) = m.split_last().context("protocol violation")?; if let Err(e) = handler.check_auth_jwt(self, jwt_response) { - self.write_message(&BeMessage::ErrorResponse( + self.write_message_noflush(&BeMessage::ErrorResponse( &e.to_string(), Some(e.pg_error_code()), ))?; @@ -449,9 +451,9 @@ impl PostgresBackend { } } } - self.write_message(&BeMessage::AuthenticationOk)? - .write_message(&BeMessage::CLIENT_ENCODING)? - .write_message(&BeMessage::ReadyForQuery)?; + self.write_message_noflush(&BeMessage::AuthenticationOk)? + .write_message_noflush(&BeMessage::CLIENT_ENCODING)? + .write_message_noflush(&BeMessage::ReadyForQuery)?; self.state = ProtoState::Established; } @@ -486,30 +488,30 @@ impl PostgresBackend { if let Err(e) = handler.process_query(self, query_string).await { log_query_error(query_string, &e); let short_error = short_error(&e); - self.write_message(&BeMessage::ErrorResponse( + self.write_message_noflush(&BeMessage::ErrorResponse( &short_error, Some(e.pg_error_code()), ))?; } - self.write_message(&BeMessage::ReadyForQuery)?; + self.write_message_noflush(&BeMessage::ReadyForQuery)?; } FeMessage::Parse(m) => { *unnamed_query_string = m.query_string; - self.write_message(&BeMessage::ParseComplete)?; + self.write_message_noflush(&BeMessage::ParseComplete)?; } FeMessage::Describe(_) => { - self.write_message(&BeMessage::ParameterDescription)? - .write_message(&BeMessage::NoData)?; + self.write_message_noflush(&BeMessage::ParameterDescription)? + .write_message_noflush(&BeMessage::NoData)?; } FeMessage::Bind(_) => { - self.write_message(&BeMessage::BindComplete)?; + self.write_message_noflush(&BeMessage::BindComplete)?; } FeMessage::Close(_) => { - self.write_message(&BeMessage::CloseComplete)?; + self.write_message_noflush(&BeMessage::CloseComplete)?; } FeMessage::Execute(_) => { @@ -517,7 +519,7 @@ impl PostgresBackend { trace!("got execute {query_string:?}"); if let Err(e) = handler.process_query(self, query_string).await { log_query_error(query_string, &e); - self.write_message(&BeMessage::ErrorResponse( + self.write_message_noflush(&BeMessage::ErrorResponse( &e.to_string(), Some(e.pg_error_code()), ))?; @@ -529,7 +531,7 @@ impl PostgresBackend { } FeMessage::Sync => { - self.write_message(&BeMessage::ReadyForQuery)?; + self.write_message_noflush(&BeMessage::ReadyForQuery)?; } FeMessage::Terminate => { @@ -579,7 +581,7 @@ impl<'a> AsyncWrite for CopyDataWriter<'a> { // XXX: if the input is large, we should split it into multiple messages. // Not sure what the threshold should be, but the ultimate hard limit is that // the length cannot exceed u32. - this.pgb.write_message(&BeMessage::CopyData(buf))?; + this.pgb.write_message_noflush(&BeMessage::CopyData(buf))?; Poll::Ready(Ok(buf.len())) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 878928ae06..b362e25424 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -64,7 +64,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream { // We were requested to shut down. let msg = format!("pageserver is shutting down"); - let _ = pgb.write_message(&BeMessage::ErrorResponse(&msg, None)); + let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)); Err(QueryError::Other(anyhow::anyhow!(msg))) } @@ -80,13 +80,13 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream { let msg = "client terminated connection with Terminate message during COPY"; let query_error_error = QueryError::Disconnected(ConnectionError::Socket(io::Error::new(io::ErrorKind::ConnectionReset, msg))); - pgb.write_message(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?; + pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?; Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?; break; } m => { let msg = format!("unexpected message {m:?}"); - pgb.write_message(&BeMessage::ErrorResponse(&msg, None))?; + pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None))?; Err(io::Error::new(io::ErrorKind::Other, msg))?; break; } @@ -97,7 +97,7 @@ fn copyin_stream(pgb: &mut PostgresBackend) -> impl Stream { let msg = "client closed connection during COPY"; let query_error_error = QueryError::Disconnected(ConnectionError::Socket(io::Error::new(io::ErrorKind::ConnectionReset, msg))); - pgb.write_message(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?; + pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error_error.pg_error_code())))?; pgb.flush().await?; Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?; } @@ -311,7 +311,7 @@ impl PageServerHandler { let timeline = tenant.get_timeline(timeline_id, true)?; // switch client to COPYBOTH - pgb.write_message(&BeMessage::CopyBothResponse)?; + pgb.write_message_noflush(&BeMessage::CopyBothResponse)?; pgb.flush().await?; let metrics = PageRequestMetrics::new(&tenant_id, &timeline_id); @@ -380,7 +380,7 @@ impl PageServerHandler { }) }); - pgb.write_message(&BeMessage::CopyData(&response.serialize()))?; + pgb.write_message_noflush(&BeMessage::CopyData(&response.serialize()))?; pgb.flush().await?; } Ok(()) @@ -416,7 +416,7 @@ impl PageServerHandler { // Import basebackup provided via CopyData info!("importing basebackup"); - pgb.write_message(&BeMessage::CopyInResponse)?; + pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.flush().await?; let mut copyin_stream = Box::pin(copyin_stream(pgb)); @@ -468,7 +468,7 @@ impl PageServerHandler { // Import wal provided via CopyData info!("importing wal"); - pgb.write_message(&BeMessage::CopyInResponse)?; + pgb.write_message_noflush(&BeMessage::CopyInResponse)?; pgb.flush().await?; let mut copyin_stream = Box::pin(copyin_stream(pgb)); let mut reader = tokio_util::io::StreamReader::new(&mut copyin_stream); @@ -678,7 +678,7 @@ impl PageServerHandler { } // switch client to COPYOUT - pgb.write_message(&BeMessage::CopyOutResponse)?; + pgb.write_message_noflush(&BeMessage::CopyOutResponse)?; pgb.flush().await?; // Send a tarball of the latest layer on the timeline @@ -695,7 +695,7 @@ impl PageServerHandler { .await?; } - pgb.write_message(&BeMessage::CopyDone)?; + pgb.write_message_noflush(&BeMessage::CopyDone)?; pgb.flush().await?; info!("basebackup complete"); @@ -812,7 +812,7 @@ impl postgres_backend_async::Handler for PageServerHandler { // Check that the timeline exists self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx) .await?; - pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } // return pair of prev_lsn and last_lsn else if query_string.starts_with("get_last_record_rlsn ") { @@ -835,15 +835,15 @@ impl postgres_backend_async::Handler for PageServerHandler { let end_of_timeline = timeline.get_last_record_rlsn(); - pgb.write_message(&BeMessage::RowDescription(&[ + pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::text_col(b"prev_lsn"), RowDescriptor::text_col(b"last_lsn"), ]))? - .write_message(&BeMessage::DataRow(&[ + .write_message_noflush(&BeMessage::DataRow(&[ Some(end_of_timeline.prev.to_string().as_bytes()), Some(end_of_timeline.last.to_string().as_bytes()), ]))? - .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } // same as basebackup, but result includes relational data as well else if query_string.starts_with("fullbackup ") { @@ -884,7 +884,7 @@ impl postgres_backend_async::Handler for PageServerHandler { // Check that the timeline exists self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, prev_lsn, true, ctx) .await?; - pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("import basebackup ") { // Import the `base` section (everything but the wal) of a basebackup. // Assumes the tenant already exists on this pageserver. @@ -929,10 +929,10 @@ impl postgres_backend_async::Handler for PageServerHandler { ) .await { - Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?, + Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, Err(e) => { error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}"); - pgb.write_message(&BeMessage::ErrorResponse( + pgb.write_message_noflush(&BeMessage::ErrorResponse( &e.to_string(), Some(e.pg_error_code()), ))? @@ -965,10 +965,10 @@ impl postgres_backend_async::Handler for PageServerHandler { .handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx) .await { - Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?, + Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?, Err(e) => { error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}"); - pgb.write_message(&BeMessage::ErrorResponse( + pgb.write_message_noflush(&BeMessage::ErrorResponse( &e.to_string(), Some(e.pg_error_code()), ))? @@ -977,7 +977,7 @@ impl postgres_backend_async::Handler for PageServerHandler { } else if query_string.to_ascii_lowercase().starts_with("set ") { // important because psycopg2 executes "SET datestyle TO 'ISO'" // on connect - pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("show ") { // show let (_, params_raw) = query_string.split_at("show ".len()); @@ -993,7 +993,7 @@ impl postgres_backend_async::Handler for PageServerHandler { self.check_permission(Some(tenant_id))?; let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; - pgb.write_message(&BeMessage::RowDescription(&[ + pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), RowDescriptor::int8_col(b"compaction_target_size"), @@ -1004,7 +1004,7 @@ impl postgres_backend_async::Handler for PageServerHandler { RowDescriptor::int8_col(b"image_creation_threshold"), RowDescriptor::int8_col(b"pitr_interval"), ]))? - .write_message(&BeMessage::DataRow(&[ + .write_message_noflush(&BeMessage::DataRow(&[ Some(tenant.get_checkpoint_distance().to_string().as_bytes()), Some( tenant @@ -1027,7 +1027,7 @@ impl postgres_backend_async::Handler for PageServerHandler { Some(tenant.get_image_creation_threshold().to_string().as_bytes()), Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()), ]))? - .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; + .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { return Err(QueryError::Other(anyhow::anyhow!( "unknown command {query_string}"