diff --git a/libs/pq_proto/src/codec.rs b/libs/pq_proto/src/codec.rs new file mode 100644 index 0000000000..1cb7c1a935 --- /dev/null +++ b/libs/pq_proto/src/codec.rs @@ -0,0 +1,62 @@ +//! Provides `PostgresCodec` defining how to serilize/deserialize Postgres +//! messages to/from the wire, to be used with `tokio_util::codec::Framed`. +use std::io; + +use bytes::BytesMut; +use tokio_util::codec::{Decoder, Encoder}; + +use crate::{BeMessage, FeMessage, FeStartupPacket, ProtocolError}; + +// Defines how to serilize/deserialize Postgres messages to/from the wire, to be +// used with `tokio_util::codec::Framed`. +pub struct PostgresCodec { + // Have we already decoded startup message? All further should start with + // message type byte then. + startup_read: bool, +} + +impl PostgresCodec { + pub fn new() -> Self { + PostgresCodec { + startup_read: false, + } + } +} + +/// Error on postgres connection: either IO (physical transport error) or +/// protocol violation. +#[derive(thiserror::Error, Debug)] +pub enum ConnectionError { + #[error(transparent)] + Io(#[from] io::Error), + #[error(transparent)] + Protocol(#[from] ProtocolError), +} + +impl Encoder<&BeMessage<'_>> for PostgresCodec { + type Error = ConnectionError; + + fn encode(&mut self, item: &BeMessage, dst: &mut BytesMut) -> Result<(), ConnectionError> { + BeMessage::write(dst, &item)?; + Ok(()) + } +} + +impl Decoder for PostgresCodec { + type Item = FeMessage; + type Error = ConnectionError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, ConnectionError> { + let msg = if !self.startup_read { + let msg = FeStartupPacket::parse(src); + if let Ok(Some(FeMessage::StartupPacket(FeStartupPacket::StartupMessage { .. }))) = msg + { + self.startup_read = true; + } + msg? + } else { + FeMessage::parse(src)? + }; + Ok(msg) + } +} diff --git a/libs/utils/src/send_rc.rs b/libs/utils/src/send_rc.rs new file mode 100644 index 0000000000..b7acfef277 --- /dev/null +++ b/libs/utils/src/send_rc.rs @@ -0,0 +1,116 @@ +/// Provides Send wrappers of Rc and RefMut. +use std::{ + borrow::Borrow, + cell::{Ref, RefCell, RefMut}, + ops::{Deref, DerefMut}, + rc::Rc, +}; + +/// Rc wrapper which is Send. +/// This is useful to allow transferring a group of Rcs pointing to the same +/// object between threads, e.g. in self referential struct. +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct SendRc +where + T: ?Sized, +{ + rc: Rc, +} + +// SAFETY: Passing Rc(s) between threads is fine as long as there is no +// concurrent access to the object they point to, so you must move all such Rcs +// together. This appears to be impossible to express in rust type system and +// SendRc doesn't provide any additional protection -- but unlike sendable +// crate, neither it requires any additional actions before/after move. Ensuring +// that sending conforms to the above is the responsibility of the type user. +unsafe impl Send for SendRc {} + +impl SendRc { + /// Constructs a new SendRc + pub fn new(value: T) -> SendRc { + SendRc { rc: Rc::new(value) } + } +} + +// https://stegosaurusdormant.com/understanding-derive-clone/ explains in detail +// why derive Clone doesn't work here. +impl Clone for SendRc { + fn clone(&self) -> Self { + SendRc { + rc: self.rc.clone(), + } + } +} + +// Deref into inner rc. +impl Deref for SendRc { + type Target = Rc; + + fn deref(&self) -> &Self::Target { + &self.rc + } +} + +/// Extends RefCell with borrow[_mut] variants which return Sendable Ref[Mut] +/// wrappers. +pub trait RefCellSend { + fn borrow_mut_send(&self) -> RefMutSend<'_, T>; +} + +impl RefCellSend for RefCell { + fn borrow_mut_send(&self) -> RefMutSend<'_, T> { + RefMutSend { + ref_mut: self.borrow_mut(), + } + } +} + +/// RefMut wrapper which is Send. See impl Send for safety. Allows to move a +/// RefMut along with RefCell it originates from between threads, e.g. have Send +/// Future containing RefMut. +#[derive(Debug)] +pub struct RefMutSend<'b, T> +where + T: 'b + ?Sized, +{ + ref_mut: RefMut<'b, T>, +} + +// SAFETY: Similar to SendRc, this is safe as long as RefMut stays in the same +// thread with original RefCell, so they should be passed together. +// Actually, since this is a referential type violating this is not +// straightforward; examples of unsafe usage could be +// - Passing a RefMut to different thread without source RefCell. Seems only +// possible with std::thread::scope. +// - Somehow multiple threads get access to single RefCell concurrently, +// violating its !Sync requirement. Improper usage of SendRc can do that. +unsafe impl<'b, T: ?Sized + Send> Send for RefMutSend<'b, T> {} + +impl<'b, T> RefMutSend<'b, T> { + /// Constructs a new RefMutSend + pub fn new(ref_mut: RefMut<'b, T>) -> RefMutSend<'b, T> { + RefMutSend { ref_mut } + } +} + +// Deref into inner RefMut. +impl<'b, T> Deref for RefMutSend<'b, T> +where + T: 'b + ?Sized, +{ + type Target = RefMut<'b, T>; + + fn deref<'a>(&'a self) -> &'a RefMut<'b, T> { + &self.ref_mut + } +} + +// DerefMut into inner RefMut. +impl<'b, T> DerefMut for RefMutSend<'b, T> +where + T: 'b + ?Sized, +{ + fn deref_mut<'a>(&'a mut self) -> &'a mut RefMut<'b, T> { + &mut self.ref_mut + } +}