mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
Our rust-postgres fork is getting messy. Mostly because proxy wants more
control over the raw protocol than tokio-postgres provides. As such,
it's diverging more and more. Storage and compute also make use of
rust-postgres, but in more normal usage, thus they don't need our crazy
changes.
Idea:
* proxy maintains their subset
* other teams use a minimal patch set against upstream rust-postgres
Reviewing this code will be difficult. To implement it, I
1. Copied tokio-postgres, postgres-protocol and postgres-types from
00940fcdb5
2. Updated their package names with the `2` suffix to make them compile
in the workspace.
3. Updated proxy to use those packages
4. Copied in the code from tokio-postgres-rustls 0.13 (with some patches
applied https://github.com/jbg/tokio-postgres-rustls/pull/32
https://github.com/jbg/tokio-postgres-rustls/pull/33)
5. Removed as much dead code as I could find in the vendored libraries
6. Updated the tokio-postgres-rustls code to use our existing channel
binding implementation
110 lines
2.9 KiB
Rust
110 lines
2.9 KiB
Rust
use bytes::{Buf, Bytes, BytesMut};
|
|
use fallible_iterator::FallibleIterator;
|
|
use postgres_protocol2::message::backend;
|
|
use postgres_protocol2::message::frontend::CopyData;
|
|
use std::io;
|
|
use tokio_util::codec::{Decoder, Encoder};
|
|
|
|
pub enum FrontendMessage {
|
|
Raw(Bytes),
|
|
CopyData(CopyData<Box<dyn Buf + Send>>),
|
|
}
|
|
|
|
pub enum BackendMessage {
|
|
Normal {
|
|
messages: BackendMessages,
|
|
request_complete: bool,
|
|
},
|
|
Async(backend::Message),
|
|
}
|
|
|
|
pub struct BackendMessages(BytesMut);
|
|
|
|
impl BackendMessages {
|
|
pub fn empty() -> BackendMessages {
|
|
BackendMessages(BytesMut::new())
|
|
}
|
|
}
|
|
|
|
impl FallibleIterator for BackendMessages {
|
|
type Item = backend::Message;
|
|
type Error = io::Error;
|
|
|
|
fn next(&mut self) -> io::Result<Option<backend::Message>> {
|
|
backend::Message::parse(&mut self.0)
|
|
}
|
|
}
|
|
|
|
pub struct PostgresCodec {
|
|
pub max_message_size: Option<usize>,
|
|
}
|
|
|
|
impl Encoder<FrontendMessage> for PostgresCodec {
|
|
type Error = io::Error;
|
|
|
|
fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> {
|
|
match item {
|
|
FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf),
|
|
FrontendMessage::CopyData(data) => data.write(dst),
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Decoder for PostgresCodec {
|
|
type Item = BackendMessage;
|
|
type Error = io::Error;
|
|
|
|
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<BackendMessage>, io::Error> {
|
|
let mut idx = 0;
|
|
let mut request_complete = false;
|
|
|
|
while let Some(header) = backend::Header::parse(&src[idx..])? {
|
|
let len = header.len() as usize + 1;
|
|
if src[idx..].len() < len {
|
|
break;
|
|
}
|
|
|
|
if let Some(max) = self.max_message_size {
|
|
if len > max {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::InvalidInput,
|
|
"message too large",
|
|
));
|
|
}
|
|
}
|
|
|
|
match header.tag() {
|
|
backend::NOTICE_RESPONSE_TAG
|
|
| backend::NOTIFICATION_RESPONSE_TAG
|
|
| backend::PARAMETER_STATUS_TAG => {
|
|
if idx == 0 {
|
|
let message = backend::Message::parse(src)?.unwrap();
|
|
return Ok(Some(BackendMessage::Async(message)));
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
|
|
idx += len;
|
|
|
|
if header.tag() == backend::READY_FOR_QUERY_TAG {
|
|
request_complete = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if idx == 0 {
|
|
Ok(None)
|
|
} else {
|
|
Ok(Some(BackendMessage::Normal {
|
|
messages: BackendMessages(src.split_to(idx)),
|
|
request_complete,
|
|
}))
|
|
}
|
|
}
|
|
}
|