From c38f38dab746ef10b32b350ecd38f5adc4d98a1a Mon Sep 17 00:00:00 2001 From: Dmitry Ivanov Date: Mon, 31 Oct 2022 16:49:40 +0300 Subject: [PATCH] Move pq_proto to its own crate --- Cargo.lock | 26 ++++++- libs/pq_proto/Cargo.toml | 16 ++++ .../src/pq_proto.rs => pq_proto/src/lib.rs} | 8 +- libs/{utils => pq_proto}/src/sync.rs | 2 +- libs/utils/Cargo.toml | 5 +- libs/utils/src/lib.rs | 6 -- libs/utils/src/postgres_backend.rs | 2 +- libs/utils/src/postgres_backend_async.rs | 2 +- pageserver/Cargo.toml | 73 +++++++++---------- pageserver/src/page_service.rs | 2 +- .../src/walreceiver/walreceiver_connection.rs | 4 +- proxy/Cargo.toml | 9 +-- proxy/src/auth/backend/link.rs | 2 +- proxy/src/auth/credentials.rs | 2 +- proxy/src/auth/flow.rs | 2 +- proxy/src/cancellation.rs | 2 +- proxy/src/compute.rs | 4 +- proxy/src/mgmt.rs | 6 +- proxy/src/proxy.rs | 2 +- proxy/src/sasl/messages.rs | 6 +- proxy/src/stream.rs | 2 +- safekeeper/Cargo.toml | 57 ++++++++------- safekeeper/src/control_file_upgrade.rs | 2 +- safekeeper/src/handler.rs | 2 +- safekeeper/src/json_ctrl.rs | 7 +- safekeeper/src/metrics.rs | 2 +- safekeeper/src/receive_wal.rs | 7 +- safekeeper/src/safekeeper.rs | 2 +- safekeeper/src/send_wal.rs | 9 +-- safekeeper/src/timeline.rs | 20 ++--- workspace_hack/Cargo.toml | 4 + 31 files changed, 154 insertions(+), 141 deletions(-) create mode 100644 libs/pq_proto/Cargo.toml rename libs/{utils/src/pq_proto.rs => pq_proto/src/lib.rs} (99%) rename libs/{utils => pq_proto}/src/sync.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index 9a5ac0b1d5..c112c05188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2145,6 +2145,7 @@ dependencies = [ "postgres-types", "postgres_ffi", "pprof", + "pq_proto", "rand", "regex", "remote_storage", @@ -2438,6 +2439,21 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "pq_proto" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "pin-project-lite", + "postgres-protocol", + "rand", + "serde", + "tokio", + "tracing", + "workspace_hack", +] + [[package]] name = "prettyplease" version = "0.1.21" @@ -2570,6 +2586,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "pin-project-lite", + "pq_proto", "rand", "rcgen", "reqwest", @@ -3086,6 +3103,7 @@ dependencies = [ "postgres", "postgres-protocol", "postgres_ffi", + "pq_proto", "regex", "remote_storage", "safekeeper_api", @@ -4046,9 +4064,7 @@ dependencies = [ "metrics", "nix 0.25.0", "once_cell", - "pin-project-lite", - "postgres", - "postgres-protocol", + "pq_proto", "rand", "routerify", "rustls", @@ -4373,6 +4389,9 @@ dependencies = [ "crossbeam-utils", "either", "fail", + "futures-channel", + "futures-task", + "futures-util", "hashbrown", "indexmap", "libc", @@ -4386,6 +4405,7 @@ dependencies = [ "rand", "regex", "regex-syntax", + "reqwest", "scopeguard", "serde", "stable_deref_trait", diff --git a/libs/pq_proto/Cargo.toml b/libs/pq_proto/Cargo.toml new file mode 100644 index 0000000000..4d48e431b4 --- /dev/null +++ b/libs/pq_proto/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "pq_proto" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0" +bytes = "1.0.1" +pin-project-lite = "0.2.7" +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +rand = "0.8.3" +serde = { version = "1.0", features = ["derive"] } +tokio = { version = "1.17", features = ["macros"] } +tracing = "0.1" + +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/utils/src/pq_proto.rs b/libs/pq_proto/src/lib.rs similarity index 99% rename from libs/utils/src/pq_proto.rs rename to libs/pq_proto/src/lib.rs index 8c4e297f82..2e311dd6e3 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/pq_proto/src/lib.rs @@ -2,7 +2,9 @@ //! //! on message formats. -use crate::sync::{AsyncishRead, SyncFuture}; +// Tools for calling certain async methods in sync contexts. +pub mod sync; + use anyhow::{bail, ensure, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_protocol::PG_EPOCH; @@ -16,6 +18,7 @@ use std::{ str, time::{Duration, SystemTime}, }; +use sync::{AsyncishRead, SyncFuture}; use tokio::io::AsyncReadExt; use tracing::{trace, warn}; @@ -198,7 +201,7 @@ impl FeMessage { /// /// ``` /// # use std::io; - /// # use utils::pq_proto::FeMessage; + /// # use pq_proto::FeMessage; /// # /// # fn process_message(msg: FeMessage) -> anyhow::Result<()> { /// # Ok(()) @@ -302,6 +305,7 @@ impl FeStartupPacket { Err(e) => return Err(e.into()), }; + #[allow(clippy::manual_range_contains)] if len < 4 || len > MAX_STARTUP_PACKET_LENGTH { bail!("invalid message length"); } diff --git a/libs/utils/src/sync.rs b/libs/pq_proto/src/sync.rs similarity index 99% rename from libs/utils/src/sync.rs rename to libs/pq_proto/src/sync.rs index 48f0ff6384..b7ff1fb70b 100644 --- a/libs/utils/src/sync.rs +++ b/libs/pq_proto/src/sync.rs @@ -29,7 +29,7 @@ impl SyncFuture { /// Example: /// /// ``` - /// # use utils::sync::SyncFuture; + /// # use pq_proto::sync::SyncFuture; /// # use std::future::Future; /// # use tokio::io::AsyncReadExt; /// # diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 1753ee81b9..36a379b47a 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -9,9 +9,6 @@ anyhow = "1.0" bincode = "1.3" bytes = "1.0.1" hyper = { version = "0.14.7", features = ["full"] } -pin-project-lite = "0.2.7" -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } routerify = "3" serde = { version = "1.0", features = ["derive"] } serde_json = "1" @@ -33,8 +30,8 @@ once_cell = "1.13.0" strum = "0.24" strum_macros = "0.24" - metrics = { path = "../metrics" } +pq_proto = { path = "../pq_proto" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } [dev-dependencies] diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 6f51465609..11ee7ac7eb 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -1,8 +1,6 @@ //! `utils` is intended to be a place to put code that is shared //! between other crates in this repository. -#![allow(clippy::manual_range_contains)] - /// `Lsn` type implements common tasks on Log Sequence Numbers pub mod lsn; /// SeqWait allows waiting for a future sequence number to arrive @@ -17,7 +15,6 @@ pub mod vec_map; pub mod bin_ser; pub mod postgres_backend; pub mod postgres_backend_async; -pub mod pq_proto; // helper functions for creating and fsyncing pub mod crashsafe; @@ -42,9 +39,6 @@ pub mod lock_file; pub mod accum; pub mod shutdown; -// Tools for calling certain async methods in sync contexts -pub mod sync; - // Utility for binding TcpListeners with proper socket options. pub mod tcp_listener; diff --git a/libs/utils/src/postgres_backend.rs b/libs/utils/src/postgres_backend.rs index adee46c2dd..89f7197718 100644 --- a/libs/utils/src/postgres_backend.rs +++ b/libs/utils/src/postgres_backend.rs @@ -3,10 +3,10 @@ //! implementation determining how to process the queries. Currently its API //! is rather narrow, but we can extend it once required. -use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket}; use crate::sock_split::{BidiStream, ReadStream, WriteStream}; use anyhow::{bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; +use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket}; use rand::Rng; use serde::{Deserialize, Serialize}; use std::fmt; diff --git a/libs/utils/src/postgres_backend_async.rs b/libs/utils/src/postgres_backend_async.rs index 53f6759d62..376819027b 100644 --- a/libs/utils/src/postgres_backend_async.rs +++ b/libs/utils/src/postgres_backend_async.rs @@ -4,9 +4,9 @@ //! is rather narrow, but we can extend it once required. use crate::postgres_backend::AuthType; -use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket}; use anyhow::{bail, Context, Result}; use bytes::{Bytes, BytesMut}; +use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket}; use rand::Rng; use std::future::Future; use std::net::SocketAddr; diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 7ce936ca27..a38978512d 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,62 +12,61 @@ testing = ["fail/failpoints"] profiling = ["pprof"] [dependencies] +amplify_num = { git = "https://github.com/hlinnaka/rust-amplify.git", branch = "unsigned-int-perf" } +anyhow = { version = "1.0", features = ["backtrace"] } async-stream = "0.3" async-trait = "0.1" -chrono = "0.4.19" -rand = "0.8.3" -regex = "1.4.5" -bytes = "1.0.1" byteorder = "1.4.3" +bytes = "1.0.1" +chrono = "0.4.19" +clap = { version = "4.0", features = ["string"] } +close_fds = "0.3.2" +const_format = "0.2.21" +crc32c = "0.6.0" +crossbeam-utils = "0.8.5" +fail = "0.5.0" futures = "0.3.13" +git-version = "0.3.5" hex = "0.4.3" +humantime = "2.1.0" +humantime-serde = "1.1.1" hyper = "0.14" itertools = "0.10.3" -clap = { version = "4.0", features = ["string"] } -tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } -tokio-util = { version = "0.7.3", features = ["io", "io-util"] } -postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +nix = "0.25" +num-traits = "0.2.15" +once_cell = "1.13.0" postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -anyhow = { version = "1.0", features = ["backtrace"] } -crc32c = "0.6.0" -thiserror = "1.0" -tar = "0.4.33" -humantime = "2.1.0" +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true } +rand = "0.8.3" +regex = "1.4.5" +rstar = "0.9.3" +scopeguard = "1.1.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1" serde_with = "2.0" -humantime-serde = "1.1.1" - -pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true } - -toml_edit = { version = "0.14", features = ["easy"] } -scopeguard = "1.1.0" -const_format = "0.2.21" -tracing = "0.1.36" signal-hook = "0.3.10" +svg_fmt = "0.4.1" +tar = "0.4.33" +thiserror = "1.0" +tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +tokio-util = { version = "0.7.3", features = ["io", "io-util"] } +toml_edit = { version = "0.14", features = ["easy"] } +tracing = "0.1.36" url = "2" -nix = "0.25" -once_cell = "1.13.0" -crossbeam-utils = "0.8.5" -fail = "0.5.0" -git-version = "0.3.5" -rstar = "0.9.3" -num-traits = "0.2.15" -amplify_num = { git = "https://github.com/hlinnaka/rust-amplify.git", branch = "unsigned-int-perf" } +walkdir = "2.3.2" -pageserver_api = { path = "../libs/pageserver_api" } -postgres_ffi = { path = "../libs/postgres_ffi" } etcd_broker = { path = "../libs/etcd_broker" } metrics = { path = "../libs/metrics" } -utils = { path = "../libs/utils" } +pageserver_api = { path = "../libs/pageserver_api" } +postgres_ffi = { path = "../libs/postgres_ffi" } +pq_proto = { path = "../libs/pq_proto" } remote_storage = { path = "../libs/remote_storage" } tenant_size_model = { path = "../libs/tenant_size_model" } +utils = { path = "../libs/utils" } workspace_hack = { version = "0.1", path = "../workspace_hack" } -close_fds = "0.3.2" -walkdir = "2.3.2" -svg_fmt = "0.4.1" [dev-dependencies] criterion = "0.4" diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f83ab1929a..fcc7a5476b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -19,6 +19,7 @@ use pageserver_api::models::{ PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, }; +use pq_proto::{BeMessage, FeMessage, RowDescriptor}; use std::io; use std::net::TcpListener; use std::str; @@ -33,7 +34,6 @@ use utils::{ lsn::Lsn, postgres_backend::AuthType, postgres_backend_async::{self, PostgresBackend}, - pq_proto::{BeMessage, FeMessage, RowDescriptor}, simple_rcu::RcuReadGuard, }; diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index a4a6af455c..0070834288 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -31,8 +31,8 @@ use crate::{ walrecord::DecodedWALRecord, }; use postgres_ffi::waldecoder::WalStreamDecoder; -use utils::id::TenantTimelineId; -use utils::{lsn::Lsn, pq_proto::ReplicationFeedback}; +use pq_proto::ReplicationFeedback; +use utils::{id::TenantTimelineId, lsn::Lsn}; /// Status of the connection. #[derive(Debug, Clone)] diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 395c22b8bc..14a5450d5e 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -22,11 +22,7 @@ once_cell = "1.13.0" parking_lot = "0.12" pin-project-lite = "0.2.7" rand = "0.8.3" -reqwest = { version = "0.11", default-features = false, features = [ - "blocking", - "json", - "rustls-tls", -] } +reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls" ] } routerify = "3" rustls = "0.20.0" rustls-pemfile = "1" @@ -45,8 +41,9 @@ url = "2.2.2" uuid = { version = "1.2", features = ["v4", "serde"] } x509-parser = "0.14" -utils = { path = "../libs/utils" } metrics = { path = "../libs/metrics" } +pq_proto = { path = "../libs/pq_proto" } +utils = { path = "../libs/utils" } workspace_hack = { version = "0.1", path = "../workspace_hack" } [dev-dependencies] diff --git a/proxy/src/auth/backend/link.rs b/proxy/src/auth/backend/link.rs index c8ca418144..96c6f0ba18 100644 --- a/proxy/src/auth/backend/link.rs +++ b/proxy/src/auth/backend/link.rs @@ -1,8 +1,8 @@ use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters}; +use pq_proto::{BeMessage as Be, BeParameterStatusMessage}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{info, info_span}; -use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage}; #[derive(Debug, Error)] pub enum LinkAuthError { diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index 57128a61f5..907f99b8e0 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -1,10 +1,10 @@ //! User credentials used in authentication. use crate::error::UserFacingError; +use pq_proto::StartupMessageParams; use std::borrow::Cow; use thiserror::Error; use tracing::info; -use utils::pq_proto::StartupMessageParams; #[derive(Debug, Error, PartialEq, Eq, Clone)] pub enum ClientCredsParseError { diff --git a/proxy/src/auth/flow.rs b/proxy/src/auth/flow.rs index 5a516fdc30..865af4d2e5 100644 --- a/proxy/src/auth/flow.rs +++ b/proxy/src/auth/flow.rs @@ -2,9 +2,9 @@ use super::{AuthErrorImpl, PasswordHackPayload}; use crate::{sasl, scram, stream::PqStream}; +use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be}; use std::io; use tokio::io::{AsyncRead, AsyncWrite}; -use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be}; /// Every authentication selector is supposed to implement this trait. pub trait AuthMethod { diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 404533ad42..b219cd0fa2 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,11 +1,11 @@ use anyhow::{anyhow, Context}; use hashbrown::HashMap; use parking_lot::Mutex; +use pq_proto::CancelKeyData; use std::net::SocketAddr; use tokio::net::TcpStream; use tokio_postgres::{CancelToken, NoTls}; use tracing::info; -use utils::pq_proto::CancelKeyData; /// Enables serving `CancelRequest`s. #[derive(Default)] diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 8e4caf6eeb..4771c774a1 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,12 +1,12 @@ use crate::{cancellation::CancelClosure, error::UserFacingError}; use futures::TryFutureExt; use itertools::Itertools; +use pq_proto::StartupMessageParams; use std::{io, net::SocketAddr}; use thiserror::Error; use tokio::net::TcpStream; use tokio_postgres::NoTls; use tracing::{error, info}; -use utils::pq_proto::StartupMessageParams; #[derive(Debug, Error)] pub enum ConnectionError { @@ -44,7 +44,7 @@ pub type ComputeConnCfg = tokio_postgres::Config; /// Various compute node info for establishing connection etc. pub struct NodeInfo { - /// Did we send [`utils::pq_proto::BeMessage::AuthenticationOk`]? + /// Did we send [`pq_proto::BeMessage::AuthenticationOk`]? pub reported_auth_ok: bool, /// Compute node connection params. pub config: tokio_postgres::Config, diff --git a/proxy/src/mgmt.rs b/proxy/src/mgmt.rs index 67693b1fb0..06d1a4f106 100644 --- a/proxy/src/mgmt.rs +++ b/proxy/src/mgmt.rs @@ -1,15 +1,13 @@ use crate::auth; use anyhow::Context; +use pq_proto::{BeMessage, SINGLE_COL_ROWDESC}; use serde::Deserialize; use std::{ net::{TcpListener, TcpStream}, thread, }; use tracing::{error, info}; -use utils::{ - postgres_backend::{self, AuthType, PostgresBackend}, - pq_proto::{BeMessage, SINGLE_COL_ROWDESC}, -}; +use utils::postgres_backend::{self, AuthType, PostgresBackend}; /// TODO: move all of that to auth-backend/link.rs when we ditch legacy-console backend diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 889445239a..9257fcd650 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -6,10 +6,10 @@ use anyhow::{bail, Context}; use futures::TryFutureExt; use metrics::{register_int_counter, IntCounter}; use once_cell::sync::Lazy; +use pq_proto::{BeMessage as Be, *}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{error, info, info_span, Instrument}; -use utils::pq_proto::{BeMessage as Be, *}; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; diff --git a/proxy/src/sasl/messages.rs b/proxy/src/sasl/messages.rs index f48aee4f26..fb3833c8b6 100644 --- a/proxy/src/sasl/messages.rs +++ b/proxy/src/sasl/messages.rs @@ -1,9 +1,9 @@ //! Definitions for SASL messages. use crate::parse::{split_at_const, split_cstr}; -use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage}; +use pq_proto::{BeAuthenticationSaslMessage, BeMessage}; -/// SASL-specific payload of [`PasswordMessage`](utils::pq_proto::FeMessage::PasswordMessage). +/// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage). #[derive(Debug)] pub struct FirstMessage<'a> { /// Authentication method, e.g. `"SCRAM-SHA-256"`. @@ -31,7 +31,7 @@ impl<'a> FirstMessage<'a> { /// A single SASL message. /// This struct is deliberately decoupled from lower-level -/// [`BeAuthenticationSaslMessage`](utils::pq_proto::BeAuthenticationSaslMessage). +/// [`BeAuthenticationSaslMessage`](pq_proto::BeAuthenticationSaslMessage). #[derive(Debug)] pub(super) enum ServerMessage { /// We expect to see more steps. diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index 2a224944e2..8e4084775c 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -2,6 +2,7 @@ use crate::error::UserFacingError; use anyhow::bail; use bytes::BytesMut; use pin_project_lite::pin_project; +use pq_proto::{BeMessage, FeMessage, FeStartupPacket}; use rustls::ServerConfig; use std::pin::Pin; use std::sync::Arc; @@ -9,7 +10,6 @@ use std::{io, task}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio_rustls::server::TlsStream; -use utils::pq_proto::{BeMessage, FeMessage, FeStartupPacket}; pin_project! { /// Stream wrapper which implements libpq's protocol. diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 0c0ca2ff9f..658bdfe42c 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -4,41 +4,42 @@ version = "0.1.0" edition = "2021" [dependencies] -regex = "1.4.5" -bytes = "1.0.1" -byteorder = "1.4.3" -hyper = "0.14" -fs2 = "0.4.3" -serde_json = "1" -tracing = "0.1.27" -clap = "4.0" -nix = "0.25" -tokio = { version = "1.17", features = ["macros", "fs"] } -postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } anyhow = "1.0" -crc32c = "0.6.0" -humantime = "2.1.0" -url = "2.2.2" -signal-hook = "0.3.10" -serde = { version = "1.0", features = ["derive"] } -serde_with = "2.0" -hex = "0.4.3" -const_format = "0.2.21" -tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } -git-version = "0.3.5" async-trait = "0.1" +byteorder = "1.4.3" +bytes = "1.0.1" +clap = "4.0" +const_format = "0.2.21" +crc32c = "0.6.0" +fs2 = "0.4.3" +git-version = "0.3.5" +hex = "0.4.3" +humantime = "2.1.0" +hyper = "0.14" +nix = "0.25" once_cell = "1.13.0" -toml_edit = { version = "0.14", features = ["easy"] } -thiserror = "1" parking_lot = "0.12.1" +postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +regex = "1.4.5" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" +serde_with = "2.0" +signal-hook = "0.3.10" +thiserror = "1" +tokio = { version = "1.17", features = ["macros", "fs"] } +tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +toml_edit = { version = "0.14", features = ["easy"] } +tracing = "0.1.27" +url = "2.2.2" -safekeeper_api = { path = "../libs/safekeeper_api" } -postgres_ffi = { path = "../libs/postgres_ffi" } -metrics = { path = "../libs/metrics" } -utils = { path = "../libs/utils" } etcd_broker = { path = "../libs/etcd_broker" } +metrics = { path = "../libs/metrics" } +postgres_ffi = { path = "../libs/postgres_ffi" } +pq_proto = { path = "../libs/pq_proto" } remote_storage = { path = "../libs/remote_storage" } +safekeeper_api = { path = "../libs/safekeeper_api" } +utils = { path = "../libs/utils" } workspace_hack = { version = "0.1", path = "../workspace_hack" } [dev-dependencies] diff --git a/safekeeper/src/control_file_upgrade.rs b/safekeeper/src/control_file_upgrade.rs index 856c164be8..95cb96fae9 100644 --- a/safekeeper/src/control_file_upgrade.rs +++ b/safekeeper/src/control_file_upgrade.rs @@ -4,13 +4,13 @@ use crate::safekeeper::{ TermSwitchEntry, }; use anyhow::{bail, Result}; +use pq_proto::SystemId; use serde::{Deserialize, Serialize}; use tracing::*; use utils::{ bin_ser::LeSer, id::{TenantId, TimelineId}, lsn::Lsn, - pq_proto::SystemId, }; /// Persistent consensus state of the acceptor. diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index ca887399e1..a1e0bcbec0 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -12,12 +12,12 @@ use anyhow::{bail, Context, Result}; use postgres_ffi::PG_TLI; use regex::Regex; +use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID}; use tracing::info; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, postgres_backend::{self, PostgresBackend}, - pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID}, }; /// Safekeeper handler of postgres commands diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 8cb2ced238..746b4461b7 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -24,11 +24,8 @@ use crate::timeline::Timeline; use crate::GlobalTimelines; use postgres_ffi::encode_logical_message; use postgres_ffi::WAL_SEGMENT_SIZE; -use utils::{ - lsn::Lsn, - postgres_backend::PostgresBackend, - pq_proto::{BeMessage, RowDescriptor, TEXT_OID}, -}; +use pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; +use utils::{lsn::Lsn, postgres_backend::PostgresBackend}; #[derive(Serialize, Deserialize, Debug)] pub struct AppendLogicalMessage { diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 095d80623a..d4d3d37737 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -383,7 +383,7 @@ impl Collector for TimelineCollector { let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; - let mut most_advanced: Option = None; + let mut most_advanced: Option = None; for replica in tli.replicas.iter() { if let Some(replica_feedback) = replica.pageserver_feedback { if let Some(current) = most_advanced { diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 09ccfe7758..6577e8c4d6 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -23,11 +23,8 @@ use crate::safekeeper::AcceptorProposerMessage; use crate::safekeeper::ProposerAcceptorMessage; use crate::handler::SafekeeperPostgresHandler; -use utils::{ - postgres_backend::PostgresBackend, - pq_proto::{BeMessage, FeMessage}, - sock_split::ReadStream, -}; +use pq_proto::{BeMessage, FeMessage}; +use utils::{postgres_backend::PostgresBackend, sock_split::ReadStream}; pub struct ReceiveWalConn<'pg> { /// Postgres connection diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 3f9b70f282..7dfa6f636e 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -18,11 +18,11 @@ use crate::control_file; use crate::send_wal::HotStandbyFeedback; use crate::wal_storage; +use pq_proto::{ReplicationFeedback, SystemId}; use utils::{ bin_ser::LeSer, id::{NodeId, TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, - pq_proto::{ReplicationFeedback, SystemId}, }; pub const SK_MAGIC: u32 = 0xcafeceefu32; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 2829c875ed..576a02c686 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -17,16 +17,11 @@ use std::sync::Arc; use std::time::Duration; use std::{str, thread}; +use pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}; use tokio::sync::watch::Receiver; use tokio::time::timeout; use tracing::*; -use utils::{ - bin_ser::BeSer, - lsn::Lsn, - postgres_backend::PostgresBackend, - pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}, - sock_split::ReadStream, -}; +use utils::{bin_ser::BeSer, lsn::Lsn, postgres_backend::PostgresBackend, sock_split::ReadStream}; // See: https://www.postgresql.org/docs/13/protocol-replication.html const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h'; diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index d8d1fb98ad..a3f0ff94ee 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -2,26 +2,20 @@ //! to glue together SafeKeeper and all other background services. use anyhow::{bail, Result}; - use etcd_broker::subscription_value::SkTimelineInfo; - -use postgres_ffi::XLogSegNo; - -use tokio::{sync::watch, time::Instant}; - -use std::cmp::{max, min}; - use parking_lot::{Mutex, MutexGuard}; - +use postgres_ffi::XLogSegNo; +use pq_proto::ReplicationFeedback; +use std::cmp::{max, min}; use std::path::PathBuf; - -use tokio::sync::mpsc::Sender; +use tokio::{ + sync::{mpsc::Sender, watch}, + time::Instant, +}; use tracing::*; - use utils::{ id::{NodeId, TenantTimelineId}, lsn::Lsn, - pq_proto::ReplicationFeedback, }; use crate::safekeeper::{ diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index f4468d85f0..2daa08c9b6 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -21,6 +21,9 @@ clap = { version = "4", features = ["color", "error-context", "help", "std", "st crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] } either = { version = "1", features = ["use_std"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } +futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] } +futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] } +futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] } hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] } @@ -34,6 +37,7 @@ prost = { version = "0.10", features = ["prost-derive", "std"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] } regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] } +reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "blocking", "hyper-rustls", "json", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-rustls", "webpki-roots"] } scopeguard = { version = "1", features = ["use_std"] } serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] } stable_deref_trait = { version = "1", features = ["alloc", "std"] }