mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
Merge branch 'main' into devin/1745492468-add-dev-flag-pr11517
This commit is contained in:
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -4236,6 +4236,7 @@ name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"camino",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -4244,12 +4245,15 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
@@ -4305,6 +4309,7 @@ dependencies = [
|
||||
"hashlink",
|
||||
"hex",
|
||||
"hex-literal",
|
||||
"http 1.1.0",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
@@ -4367,6 +4372,7 @@ dependencies = [
|
||||
"toml_edit",
|
||||
"tonic 0.13.1",
|
||||
"tonic-reflection",
|
||||
"tower 0.5.2",
|
||||
"tracing",
|
||||
"tracing-utils",
|
||||
"twox-hash",
|
||||
|
||||
@@ -1180,14 +1180,14 @@ RUN cd exts/rag && \
|
||||
RUN cd exts/rag_bge_small_en_v15 && \
|
||||
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
ORT_LIB_LOCATION=/ext-src/onnxruntime-src/build/Linux \
|
||||
REMOTE_ONNX_URL=http://pg-ext-s3-gateway/pgrag-data/bge_small_en_v15.onnx \
|
||||
REMOTE_ONNX_URL=http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/pgrag-data/bge_small_en_v15.onnx \
|
||||
cargo pgrx install --release --features remote_onnx && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/rag_bge_small_en_v15.control
|
||||
|
||||
RUN cd exts/rag_jina_reranker_v1_tiny_en && \
|
||||
sed -i 's/pgrx = "0.14.1"/pgrx = { version = "0.14.1", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
ORT_LIB_LOCATION=/ext-src/onnxruntime-src/build/Linux \
|
||||
REMOTE_ONNX_URL=http://pg-ext-s3-gateway/pgrag-data/jina_reranker_v1_tiny_en.onnx \
|
||||
REMOTE_ONNX_URL=http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/pgrag-data/jina_reranker_v1_tiny_en.onnx \
|
||||
cargo pgrx install --release --features remote_onnx && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/rag_jina_reranker_v1_tiny_en.control
|
||||
|
||||
|
||||
@@ -1934,7 +1934,7 @@ pub enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(strum_macros::EnumProperty)]
|
||||
#[derive(Debug, strum_macros::EnumProperty)]
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
@@ -2045,7 +2045,7 @@ pub enum PagestreamProtocolVersion {
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
@@ -2064,7 +2064,7 @@ pub struct PagestreamNblocksRequest {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
|
||||
@@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
@@ -184,12 +184,12 @@ pub enum SlruKind {
|
||||
MultiXactOffsets,
|
||||
}
|
||||
|
||||
impl SlruKind {
|
||||
pub fn to_str(&self) -> &'static str {
|
||||
impl fmt::Display for SlruKind {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Clog => "pg_xact",
|
||||
Self::MultiXactMembers => "pg_multixact/members",
|
||||
Self::MultiXactOffsets => "pg_multixact/offsets",
|
||||
Self::Clog => write!(f, "pg_xact"),
|
||||
Self::MultiXactMembers => write!(f, "pg_multixact/members"),
|
||||
Self::MultiXactOffsets => write!(f, "pg_multixact/offsets"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +73,7 @@ pub mod error;
|
||||
/// async timeout helper
|
||||
pub mod timeout;
|
||||
|
||||
pub mod span;
|
||||
pub mod sync;
|
||||
|
||||
pub mod failpoint_support;
|
||||
|
||||
19
libs/utils/src/span.rs
Normal file
19
libs/utils/src/span.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
//! Tracing span helpers.
|
||||
|
||||
/// Records the given fields in the current span, as a single call. The fields must already have
|
||||
/// been declared for the span (typically with empty values).
|
||||
#[macro_export]
|
||||
macro_rules! span_record {
|
||||
($($tokens:tt)*) => {$crate::span_record_in!(::tracing::Span::current(), $($tokens)*)};
|
||||
}
|
||||
|
||||
/// Records the given fields in the given span, as a single call. The fields must already have been
|
||||
/// declared for the span (typically with empty values).
|
||||
#[macro_export]
|
||||
macro_rules! span_record_in {
|
||||
($span:expr, $($tokens:tt)*) => {
|
||||
if let Some(meta) = $span.metadata() {
|
||||
$span.record_all(&tracing::valueset!(meta.fields(), $($tokens)*));
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -34,6 +34,7 @@ fail.workspace = true
|
||||
futures.workspace = true
|
||||
hashlink.workspace = true
|
||||
hex.workspace = true
|
||||
http.workspace = true
|
||||
http-utils.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
humantime.workspace = true
|
||||
@@ -93,6 +94,7 @@ tokio-util.workspace = true
|
||||
toml_edit = { workspace = true, features = [ "serde" ] }
|
||||
tonic.workspace = true
|
||||
tonic-reflection.workspace = true
|
||||
tower.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-utils.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
//!
|
||||
//! - Validate protocol invariants, via try_from() and try_into().
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use bytes::Bytes;
|
||||
use postgres_ffi::Oid;
|
||||
use smallvec::SmallVec;
|
||||
@@ -48,7 +50,8 @@ pub struct ReadLsn {
|
||||
pub request_lsn: Lsn,
|
||||
/// If given, the caller guarantees that the page has not been modified since this LSN. Must be
|
||||
/// smaller than or equal to request_lsn. This allows the Pageserver to serve an old page
|
||||
/// without waiting for the request LSN to arrive. Valid for all request types.
|
||||
/// without waiting for the request LSN to arrive. If not given, the request will read at the
|
||||
/// request_lsn and wait for it to arrive if necessary. Valid for all request types.
|
||||
///
|
||||
/// It is undefined behaviour to make a request such that the page was, in fact, modified
|
||||
/// between request_lsn and not_modified_since_lsn. The Pageserver might detect it and return an
|
||||
@@ -58,6 +61,17 @@ pub struct ReadLsn {
|
||||
pub not_modified_since_lsn: Option<Lsn>,
|
||||
}
|
||||
|
||||
impl Display for ReadLsn {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let req_lsn = self.request_lsn;
|
||||
if let Some(mod_lsn) = self.not_modified_since_lsn {
|
||||
write!(f, "{req_lsn}>={mod_lsn}")
|
||||
} else {
|
||||
req_lsn.fmt(f)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReadLsn {
|
||||
/// Validates the ReadLsn.
|
||||
pub fn validate(&self) -> Result<(), ProtocolError> {
|
||||
@@ -584,6 +598,7 @@ impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
// TODO: can a segment legitimately be empty?
|
||||
if segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -15,14 +16,17 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::models::{
|
||||
PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -22,6 +26,12 @@ use utils::lsn::Lsn;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Protocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -35,6 +45,8 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long, value_enum, default_value = "libpq")]
|
||||
protocol: Protocol,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
@@ -303,7 +315,20 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
let client: Box<dyn Client> = match args.protocol {
|
||||
Protocol::Libpq => Box::new(
|
||||
LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
Protocol::Grpc => Box::new(
|
||||
GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -355,23 +380,15 @@ async fn main_impl(
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
async fn run_worker(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
mut client: Box<dyn Client>,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
@@ -415,12 +432,12 @@ async fn client_libpq(
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
client.send_get_page(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
client.recv_get_page().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
@@ -442,3 +459,104 @@ async fn client_libpq(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A benchmark client, to allow switching out the transport protocol.
|
||||
///
|
||||
/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
|
||||
/// return a future that resolves when the response is received, but we don't really need it.
|
||||
#[async_trait]
|
||||
trait Client: Send {
|
||||
/// Sends an asynchronous GetPage request to the pageserver.
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>;
|
||||
|
||||
/// Receives the next GetPage response from the pageserver.
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse>;
|
||||
}
|
||||
|
||||
/// A libpq-based Pageserver client.
|
||||
struct LibpqClient {
|
||||
inner: pageserver_client::page_service::PagestreamClient,
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring)
|
||||
.await?
|
||||
.pagestream(ttid.tenant_id, ttid.timeline_id)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for LibpqClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.inner.getpage_send(req).await
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.inner.getpage_recv().await
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC client using the raw, no-frills gRPC client.
|
||||
struct GrpcClient {
|
||||
req_tx: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
resp_rx: tonic::Streaming<proto::GetPageResponse>,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?;
|
||||
|
||||
// The channel has a buffer size of 1, since 0 is not allowed. It does not matter, since the
|
||||
// benchmark will control the queue depth (i.e. in-flight requests) anyway, and requests are
|
||||
// buffered by Tonic and the OS too.
|
||||
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
|
||||
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
|
||||
let mut req = tonic::Request::new(req_stream);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?);
|
||||
metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?);
|
||||
metadata.insert("neon-shard-id", "0000".try_into()?);
|
||||
|
||||
let resp = client.get_pages(req).await?;
|
||||
let resp_stream = resp.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
req_tx,
|
||||
resp_rx: resp_stream,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for GrpcClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
let req = proto::GetPageRequest {
|
||||
request_id: 0,
|
||||
request_class: proto::GetPageClass::Normal as i32,
|
||||
read_lsn: Some(proto::ReadLsn {
|
||||
request_lsn: req.hdr.request_lsn.0,
|
||||
not_modified_since_lsn: req.hdr.not_modified_since.0,
|
||||
}),
|
||||
rel: Some(req.rel.into()),
|
||||
block_number: vec![req.blkno],
|
||||
};
|
||||
self.req_tx.send(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let resp = self.resp_rx.message().await?.unwrap();
|
||||
anyhow::ensure!(
|
||||
resp.status_code == proto::GetPageStatusCode::Ok as i32,
|
||||
"unexpected status code: {}",
|
||||
resp.status_code
|
||||
);
|
||||
Ok(PagestreamGetPageResponse {
|
||||
page: resp.page_image[0].clone(),
|
||||
req: PagestreamGetPageRequest::default(), // dummy
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,30 @@ impl From<GetVectoredError> for BasebackupError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BasebackupError> for postgres_backend::QueryError {
|
||||
fn from(err: BasebackupError) -> Self {
|
||||
use postgres_backend::QueryError;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
match err {
|
||||
BasebackupError::Client(err, _) => QueryError::Disconnected(ConnectionError::Io(err)),
|
||||
BasebackupError::Server(err) => QueryError::Other(err),
|
||||
BasebackupError::Shutdown => QueryError::Shutdown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BasebackupError> for tonic::Status {
|
||||
fn from(err: BasebackupError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
BasebackupError::Client(_, _) => Code::Cancelled,
|
||||
BasebackupError::Server(_) => Code::Internal,
|
||||
BasebackupError::Shutdown => Code::Unavailable,
|
||||
};
|
||||
tonic::Status::new(code, err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Create basebackup with non-rel data in it.
|
||||
/// Only include relational data if 'full_backup' is true.
|
||||
///
|
||||
@@ -248,7 +272,7 @@ where
|
||||
async fn flush(&mut self) -> Result<(), BasebackupError> {
|
||||
let nblocks = self.buf.len() / BLCKSZ as usize;
|
||||
let (kind, segno) = self.current_segment.take().unwrap();
|
||||
let segname = format!("{}/{:>04X}", kind.to_str(), segno);
|
||||
let segname = format!("{kind}/{segno:>04X}");
|
||||
let header = new_tar_header(&segname, self.buf.len() as u64)?;
|
||||
self.ar
|
||||
.append(&header, self.buf.as_slice())
|
||||
|
||||
@@ -817,7 +817,7 @@ fn start_pageserver(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache.clone(),
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
@@ -829,12 +829,10 @@ fn start_pageserver(
|
||||
let mut page_service_grpc = None;
|
||||
if let Some(grpc_listener) = grpc_listener {
|
||||
page_service_grpc = Some(page_service::spawn_grpc(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
basebackup_cache,
|
||||
)?);
|
||||
}
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -471,8 +471,19 @@ impl Timeline {
|
||||
|
||||
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
|
||||
|
||||
if rels.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
// Pre-deserialize the rel directory to avoid duplicated work in `get_relsize_cached`.
|
||||
let reldir_key = rel_dir_to_key(spcnode, dbnode);
|
||||
let buf = version.get(self, reldir_key, ctx).await?;
|
||||
let reldir = RelDirectory::des(&buf)?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, version, ctx).await?;
|
||||
let n_blocks = self
|
||||
.get_rel_size_in_reldir(rel, version, Some((reldir_key, &reldir)), ctx)
|
||||
.await?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
@@ -487,6 +498,19 @@ impl Timeline {
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
self.get_rel_size_in_reldir(tag, version, None, ctx).await
|
||||
}
|
||||
|
||||
/// Get size of a relation file. The relation must exist, otherwise an error is returned.
|
||||
///
|
||||
/// See [`Self::get_rel_exists_in_reldir`] on why we need `deserialized_reldir_v1`.
|
||||
pub(crate) async fn get_rel_size_in_reldir(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(
|
||||
@@ -499,7 +523,9 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, version, ctx).await?
|
||||
&& !self
|
||||
.get_rel_exists_in_reldir(tag, version, deserialized_reldir_v1, ctx)
|
||||
.await?
|
||||
{
|
||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
@@ -521,11 +547,28 @@ impl Timeline {
|
||||
///
|
||||
/// Only shard 0 has a full view of the relations. Other shards only know about relations that
|
||||
/// the shard stores pages for.
|
||||
///
|
||||
pub(crate) async fn get_rel_exists(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
self.get_rel_exists_in_reldir(tag, version, None, ctx).await
|
||||
}
|
||||
|
||||
/// Does the relation exist? With a cached deserialized `RelDirectory`.
|
||||
///
|
||||
/// There are some cases where the caller loops across all relations. In that specific case,
|
||||
/// the caller should obtain the deserialized `RelDirectory` first and then call this function
|
||||
/// to avoid duplicated work of deserliazation. This is a hack and should be removed by introducing
|
||||
/// a new API (e.g., `get_rel_exists_batched`).
|
||||
pub(crate) async fn get_rel_exists_in_reldir(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
deserialized_reldir_v1: Option<(Key, &RelDirectory)>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(PageReconstructError::Other(
|
||||
@@ -568,6 +611,17 @@ impl Timeline {
|
||||
// fetch directory listing (old)
|
||||
|
||||
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
|
||||
|
||||
if let Some((cached_key, dir)) = deserialized_reldir_v1 {
|
||||
if cached_key == key {
|
||||
return Ok(dir.rels.contains(&(tag.relnode, tag.forknum)));
|
||||
} else if cfg!(test) || cfg!(feature = "testing") {
|
||||
panic!("cached reldir key mismatch: {cached_key} != {key}");
|
||||
} else {
|
||||
warn!("cached reldir key mismatch: {cached_key} != {key}");
|
||||
}
|
||||
// Fallback to reading the directory from the datadir.
|
||||
}
|
||||
let buf = version.get(self, key, ctx).await?;
|
||||
|
||||
let dir = RelDirectory::des(&buf)?;
|
||||
|
||||
@@ -950,6 +950,18 @@ pub(crate) enum WaitLsnError {
|
||||
Timeout(String),
|
||||
}
|
||||
|
||||
impl From<WaitLsnError> for tonic::Status {
|
||||
fn from(err: WaitLsnError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
};
|
||||
tonic::Status::new(code, err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
// The impls below achieve cancellation mapping for errors.
|
||||
// Perhaps there's a way of achieving this with less cruft.
|
||||
|
||||
|
||||
@@ -25,19 +25,15 @@ pub(super) async fn authenticate(
|
||||
}
|
||||
AuthSecret::Scram(secret) => {
|
||||
debug!("auth endpoint chooses SCRAM");
|
||||
let scram = auth::Scram(&secret, ctx);
|
||||
|
||||
let auth_outcome = tokio::time::timeout(config.scram_protocol_timeout, async {
|
||||
AuthFlow::new(client, scram)
|
||||
.authenticate()
|
||||
.await
|
||||
.inspect_err(|error| {
|
||||
warn!(?error, "error processing scram messages");
|
||||
})
|
||||
})
|
||||
let auth_outcome = tokio::time::timeout(
|
||||
config.scram_protocol_timeout,
|
||||
AuthFlow::new(client, auth::Scram(&secret, ctx)).authenticate(),
|
||||
)
|
||||
.await
|
||||
.inspect_err(|_| warn!("error processing scram messages error = authentication timed out, execution time exceeded {} seconds", config.scram_protocol_timeout.as_secs()))
|
||||
.map_err(auth::AuthError::user_timeout)??;
|
||||
.map_err(auth::AuthError::user_timeout)?
|
||||
.inspect_err(|error| warn!(?error, "error processing scram messages"))?;
|
||||
|
||||
let client_key = match auth_outcome {
|
||||
sasl::Outcome::Success(key) => key,
|
||||
|
||||
@@ -159,7 +159,7 @@ pub async fn task_main(
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
config: &'static ProxyConfig,
|
||||
backend: &'static ConsoleRedirectBackend,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -7,7 +7,9 @@ use std::time::Duration;
|
||||
|
||||
use ::http::HeaderName;
|
||||
use ::http::header::AUTHORIZATION;
|
||||
use bytes::Bytes;
|
||||
use futures::TryFutureExt;
|
||||
use hyper::StatusCode;
|
||||
use postgres_client::config::SslMode;
|
||||
use tokio::time::Instant;
|
||||
use tracing::{Instrument, debug, info, info_span, warn};
|
||||
@@ -72,28 +74,34 @@ impl NeonControlPlaneClient {
|
||||
role: &RoleName,
|
||||
) -> Result<AuthInfo, GetAuthInfoError> {
|
||||
async {
|
||||
let request = self
|
||||
.endpoint
|
||||
.get_path("get_endpoint_access_control")
|
||||
.header(X_REQUEST_ID, ctx.session_id().to_string())
|
||||
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", ctx.session_id())])
|
||||
.query(&[
|
||||
("application_name", ctx.console_application_name().as_str()),
|
||||
("endpointish", endpoint.as_str()),
|
||||
("role", role.as_str()),
|
||||
])
|
||||
.build()?;
|
||||
|
||||
debug!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let response = {
|
||||
let _pause = ctx.latency_timer_pause_at(start, crate::metrics::Waiting::Cplane);
|
||||
self.endpoint.execute(request).await?
|
||||
};
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let request = self
|
||||
.endpoint
|
||||
.get_path("get_endpoint_access_control")
|
||||
.header(X_REQUEST_ID, ctx.session_id().to_string())
|
||||
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
|
||||
.query(&[("session_id", ctx.session_id())])
|
||||
.query(&[
|
||||
("application_name", ctx.console_application_name().as_str()),
|
||||
("endpointish", endpoint.as_str()),
|
||||
("role", role.as_str()),
|
||||
])
|
||||
.build()?;
|
||||
|
||||
let body = match parse_body::<GetEndpointAccessControl>(response).await {
|
||||
debug!(url = request.url().as_str(), "sending http request");
|
||||
let start = Instant::now();
|
||||
let _pause = ctx.latency_timer_pause_at(start, crate::metrics::Waiting::Cplane);
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
|
||||
response
|
||||
};
|
||||
|
||||
let body = match parse_body::<GetEndpointAccessControl>(
|
||||
response.status(),
|
||||
response.bytes().await?,
|
||||
) {
|
||||
Ok(body) => body,
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
// TODO(anna): retry
|
||||
@@ -184,7 +192,10 @@ impl NeonControlPlaneClient {
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
|
||||
let body = parse_body::<EndpointJwksResponse>(response).await?;
|
||||
let body = parse_body::<EndpointJwksResponse>(
|
||||
response.status(),
|
||||
response.bytes().await.map_err(ControlPlaneError::from)?,
|
||||
)?;
|
||||
|
||||
let rules = body
|
||||
.jwks
|
||||
@@ -236,7 +247,7 @@ impl NeonControlPlaneClient {
|
||||
let response = self.endpoint.execute(request).await?;
|
||||
drop(pause);
|
||||
info!(duration = ?start.elapsed(), "received http response");
|
||||
let body = parse_body::<WakeCompute>(response).await?;
|
||||
let body = parse_body::<WakeCompute>(response.status(), response.bytes().await?)?;
|
||||
|
||||
// Unfortunately, ownership won't let us use `Option::ok_or` here.
|
||||
let (host, port) = match parse_host_port(&body.address) {
|
||||
@@ -487,33 +498,33 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
}
|
||||
|
||||
/// Parse http response body, taking status code into account.
|
||||
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
response: http::Response,
|
||||
fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
status: StatusCode,
|
||||
body: Bytes,
|
||||
) -> Result<T, ControlPlaneError> {
|
||||
let status = response.status();
|
||||
if status.is_success() {
|
||||
// We shouldn't log raw body because it may contain secrets.
|
||||
info!("request succeeded, processing the body");
|
||||
return Ok(response.json().await?);
|
||||
return Ok(serde_json::from_slice(&body).map_err(std::io::Error::other)?);
|
||||
}
|
||||
let s = response.bytes().await?;
|
||||
|
||||
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
|
||||
info!("response_error plaintext: {:?}", s);
|
||||
info!("response_error plaintext: {:?}", body);
|
||||
|
||||
// Don't throw an error here because it's not as important
|
||||
// as the fact that the request itself has failed.
|
||||
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
|
||||
let mut body = serde_json::from_slice(&body).unwrap_or_else(|e| {
|
||||
warn!("failed to parse error body: {e}");
|
||||
ControlPlaneErrorMessage {
|
||||
Box::new(ControlPlaneErrorMessage {
|
||||
error: "reason unclear (malformed error message)".into(),
|
||||
http_status_code: status,
|
||||
status: None,
|
||||
}
|
||||
})
|
||||
});
|
||||
body.http_status_code = status;
|
||||
|
||||
warn!("console responded with an error ({status}): {body:?}");
|
||||
Err(ControlPlaneError::Message(Box::new(body)))
|
||||
Err(ControlPlaneError::Message(body))
|
||||
}
|
||||
|
||||
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
|
||||
|
||||
@@ -4,9 +4,10 @@
|
||||
|
||||
pub mod health_server;
|
||||
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::FutureExt;
|
||||
use http::Method;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::body::Body;
|
||||
@@ -109,15 +110,31 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
/// Execute a [request](reqwest::Request).
|
||||
pub(crate) async fn execute(&self, request: Request) -> Result<Response, Error> {
|
||||
let _timer = Metrics::get()
|
||||
pub(crate) fn execute(
|
||||
&self,
|
||||
request: Request,
|
||||
) -> impl Future<Output = Result<Response, Error>> {
|
||||
let metric = Metrics::get()
|
||||
.proxy
|
||||
.console_request_latency
|
||||
.start_timer(ConsoleRequest {
|
||||
.with_labels(ConsoleRequest {
|
||||
request: request.url().path(),
|
||||
});
|
||||
|
||||
self.client.execute(request).await
|
||||
let req = self.client.execute(request).boxed();
|
||||
|
||||
async move {
|
||||
let start = Instant::now();
|
||||
scopeguard::defer!({
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.console_request_latency
|
||||
.get_metric(metric)
|
||||
.observe_duration_since(start);
|
||||
});
|
||||
|
||||
req.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -186,7 +186,7 @@ where
|
||||
pub async fn read_message<'a, S>(
|
||||
stream: &mut S,
|
||||
buf: &'a mut Vec<u8>,
|
||||
max: usize,
|
||||
max: u32,
|
||||
) -> io::Result<(u8, &'a mut [u8])>
|
||||
where
|
||||
S: AsyncRead + Unpin,
|
||||
@@ -206,7 +206,7 @@ where
|
||||
let header = read!(stream => Header);
|
||||
|
||||
// as described above, the length must be at least 4.
|
||||
let Some(len) = (header.len.get() as usize).checked_sub(4) else {
|
||||
let Some(len) = header.len.get().checked_sub(4) else {
|
||||
return Err(io::Error::other(format!(
|
||||
"invalid startup message length {}, must be at least 4.",
|
||||
header.len,
|
||||
@@ -222,7 +222,7 @@ where
|
||||
}
|
||||
|
||||
// read in our entire message.
|
||||
buf.resize(len, 0);
|
||||
buf.resize(len as usize, 0);
|
||||
stream.read_exact(buf).await?;
|
||||
|
||||
Ok((header.tag, buf))
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{debug, info, warn};
|
||||
@@ -57,7 +58,7 @@ pub(crate) enum HandshakeData<S> {
|
||||
/// It's easier to work with owned `stream` here as we need to upgrade it to TLS;
|
||||
/// we also take an extra care of propagating only the select handshake errors to client.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
ctx: &RequestContext,
|
||||
stream: S,
|
||||
mut tls: Option<&TlsConfig>,
|
||||
@@ -108,7 +109,9 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
.map_ok(Box::new)
|
||||
.boxed();
|
||||
|
||||
res?;
|
||||
|
||||
@@ -146,7 +149,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
tls.cert_resolver.resolve(conn_info.server_name());
|
||||
|
||||
let tls = Stream::Tls {
|
||||
tls: Box::new(tls_stream),
|
||||
tls: tls_stream,
|
||||
tls_server_end_point,
|
||||
};
|
||||
(stream, msg) = PqStream::parse_startup(tls).await?;
|
||||
|
||||
@@ -270,7 +270,7 @@ impl ReportableError for ClientRequestError {
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
|
||||
config: &'static ProxyConfig,
|
||||
auth_backend: &'static auth::Backend<'static, ()>,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use futures::FutureExt;
|
||||
use smol_str::SmolStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::debug;
|
||||
@@ -89,6 +90,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
.compute
|
||||
.cancel_closure
|
||||
.try_cancel_query(compute_config)
|
||||
.boxed()
|
||||
.await
|
||||
{
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
|
||||
@@ -30,52 +30,53 @@ where
|
||||
F: FnOnce(&str) -> super::Result<M>,
|
||||
M: Mechanism,
|
||||
{
|
||||
let sasl = {
|
||||
let (mut mechanism, mut input) = {
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
|
||||
// Initial client message contains the chosen auth method's name.
|
||||
let msg = stream.read_password_message().await?;
|
||||
super::FirstMessage::parse(msg).ok_or(super::Error::BadClientMessage("bad sasl message"))?
|
||||
|
||||
let sasl = super::FirstMessage::parse(msg)
|
||||
.ok_or(super::Error::BadClientMessage("bad sasl message"))?;
|
||||
|
||||
(mechanism(sasl.method)?, sasl.message)
|
||||
};
|
||||
|
||||
let mut mechanism = mechanism(sasl.method)?;
|
||||
let mut input = sasl.message;
|
||||
loop {
|
||||
let step = mechanism
|
||||
.exchange(input)
|
||||
.inspect_err(|error| tracing::info!(?error, "error during SASL exchange"))?;
|
||||
|
||||
match step {
|
||||
Step::Continue(moved_mechanism, reply) => {
|
||||
match mechanism.exchange(input) {
|
||||
Ok(Step::Continue(moved_mechanism, reply)) => {
|
||||
mechanism = moved_mechanism;
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
|
||||
// write reply
|
||||
let sasl_msg = BeAuthenticationSaslMessage::Continue(reply.as_bytes());
|
||||
stream.write_message(BeMessage::AuthenticationSasl(sasl_msg));
|
||||
|
||||
// get next input
|
||||
stream.flush().await?;
|
||||
let msg = stream.read_password_message().await?;
|
||||
input = std::str::from_utf8(msg)
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad encoding"))?;
|
||||
drop(reply);
|
||||
}
|
||||
Step::Success(result, reply) => {
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
|
||||
Ok(Step::Success(result, reply)) => {
|
||||
// write reply
|
||||
let sasl_msg = BeAuthenticationSaslMessage::Final(reply.as_bytes());
|
||||
stream.write_message(BeMessage::AuthenticationSasl(sasl_msg));
|
||||
stream.write_message(BeMessage::AuthenticationOk);
|
||||
|
||||
// exit with success
|
||||
break Ok(Outcome::Success(result));
|
||||
}
|
||||
// exit with failure
|
||||
Step::Failure(reason) => break Ok(Outcome::Failure(reason)),
|
||||
Ok(Step::Failure(reason)) => break Ok(Outcome::Failure(reason)),
|
||||
Err(error) => {
|
||||
tracing::info!(?error, "error during SASL exchange");
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
// pause the timer while we communicate with the client
|
||||
let _paused = ctx.latency_timer_pause(crate::metrics::Waiting::Client);
|
||||
|
||||
// get next input
|
||||
stream.flush().await?;
|
||||
let msg = stream.read_password_message().await?;
|
||||
input = std::str::from_utf8(msg)
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "bad encoding"))?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> PqStream<S> {
|
||||
impl<S: AsyncRead + Unpin> PqStream<S> {
|
||||
/// Read a raw postgres packet, which will respect the max length requested.
|
||||
/// This is not cancel safe.
|
||||
async fn read_raw_expect(&mut self, tag: u8, max: usize) -> io::Result<&mut [u8]> {
|
||||
async fn read_raw_expect(&mut self, tag: u8, max: u32) -> io::Result<&mut [u8]> {
|
||||
let (actual_tag, msg) = read_message(&mut self.stream, &mut self.read, max).await?;
|
||||
if actual_tag != tag {
|
||||
return Err(io::Error::other(format!(
|
||||
@@ -89,7 +89,7 @@ impl<S: AsyncRead + Unpin> PqStream<S> {
|
||||
// passwords are usually pretty short
|
||||
// and SASL SCRAM messages are no longer than 256 bytes in my testing
|
||||
// (a few hashes and random bytes, encoded into base64).
|
||||
const MAX_PASSWORD_LENGTH: usize = 512;
|
||||
const MAX_PASSWORD_LENGTH: u32 = 512;
|
||||
self.read_raw_expect(FE_PASSWORD_MESSAGE, MAX_PASSWORD_LENGTH)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -31,7 +31,9 @@ mod private {
|
||||
type Output = io::Result<RustlsStream<S>>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut self.inner).poll(cx).map_ok(RustlsStream)
|
||||
Pin::new(&mut self.inner)
|
||||
.poll(cx)
|
||||
.map_ok(|s| RustlsStream(Box::new(s)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,7 +59,7 @@ mod private {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RustlsStream<S>(TlsStream<S>);
|
||||
pub struct RustlsStream<S>(Box<TlsStream<S>>);
|
||||
|
||||
impl<S> postgres_client::tls::TlsStream for RustlsStream<S>
|
||||
where
|
||||
|
||||
Reference in New Issue
Block a user