Move pq_proto to its own crate

This commit is contained in:
Dmitry Ivanov
2022-10-31 16:49:40 +03:00
parent 71d268c7c4
commit c38f38dab7
31 changed files with 154 additions and 141 deletions

26
Cargo.lock generated
View File

@@ -2145,6 +2145,7 @@ dependencies = [
"postgres-types",
"postgres_ffi",
"pprof",
"pq_proto",
"rand",
"regex",
"remote_storage",
@@ -2438,6 +2439,21 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "pq_proto"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"pin-project-lite",
"postgres-protocol",
"rand",
"serde",
"tokio",
"tracing",
"workspace_hack",
]
[[package]]
name = "prettyplease"
version = "0.1.21"
@@ -2570,6 +2586,7 @@ dependencies = [
"once_cell",
"parking_lot 0.12.1",
"pin-project-lite",
"pq_proto",
"rand",
"rcgen",
"reqwest",
@@ -3086,6 +3103,7 @@ dependencies = [
"postgres",
"postgres-protocol",
"postgres_ffi",
"pq_proto",
"regex",
"remote_storage",
"safekeeper_api",
@@ -4046,9 +4064,7 @@ dependencies = [
"metrics",
"nix 0.25.0",
"once_cell",
"pin-project-lite",
"postgres",
"postgres-protocol",
"pq_proto",
"rand",
"routerify",
"rustls",
@@ -4373,6 +4389,9 @@ dependencies = [
"crossbeam-utils",
"either",
"fail",
"futures-channel",
"futures-task",
"futures-util",
"hashbrown",
"indexmap",
"libc",
@@ -4386,6 +4405,7 @@ dependencies = [
"rand",
"regex",
"regex-syntax",
"reqwest",
"scopeguard",
"serde",
"stable_deref_trait",

16
libs/pq_proto/Cargo.toml Normal file
View File

@@ -0,0 +1,16 @@
[package]
name = "pq_proto"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
bytes = "1.0.1"
pin-project-lite = "0.2.7"
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
rand = "0.8.3"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.17", features = ["macros"] }
tracing = "0.1"
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -2,7 +2,9 @@
//! <https://www.postgresql.org/docs/devel/protocol-message-formats.html>
//! on message formats.
use crate::sync::{AsyncishRead, SyncFuture};
// Tools for calling certain async methods in sync contexts.
pub mod sync;
use anyhow::{bail, ensure, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_protocol::PG_EPOCH;
@@ -16,6 +18,7 @@ use std::{
str,
time::{Duration, SystemTime},
};
use sync::{AsyncishRead, SyncFuture};
use tokio::io::AsyncReadExt;
use tracing::{trace, warn};
@@ -198,7 +201,7 @@ impl FeMessage {
///
/// ```
/// # use std::io;
/// # use utils::pq_proto::FeMessage;
/// # use pq_proto::FeMessage;
/// #
/// # fn process_message(msg: FeMessage) -> anyhow::Result<()> {
/// # Ok(())
@@ -302,6 +305,7 @@ impl FeStartupPacket {
Err(e) => return Err(e.into()),
};
#[allow(clippy::manual_range_contains)]
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
bail!("invalid message length");
}

View File

@@ -29,7 +29,7 @@ impl<S, T: Future> SyncFuture<S, T> {
/// Example:
///
/// ```
/// # use utils::sync::SyncFuture;
/// # use pq_proto::sync::SyncFuture;
/// # use std::future::Future;
/// # use tokio::io::AsyncReadExt;
/// #

View File

@@ -9,9 +9,6 @@ anyhow = "1.0"
bincode = "1.3"
bytes = "1.0.1"
hyper = { version = "0.14.7", features = ["full"] }
pin-project-lite = "0.2.7"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
routerify = "3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
@@ -33,8 +30,8 @@ once_cell = "1.13.0"
strum = "0.24"
strum_macros = "0.24"
metrics = { path = "../metrics" }
pq_proto = { path = "../pq_proto" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[dev-dependencies]

View File

@@ -1,8 +1,6 @@
//! `utils` is intended to be a place to put code that is shared
//! between other crates in this repository.
#![allow(clippy::manual_range_contains)]
/// `Lsn` type implements common tasks on Log Sequence Numbers
pub mod lsn;
/// SeqWait allows waiting for a future sequence number to arrive
@@ -17,7 +15,6 @@ pub mod vec_map;
pub mod bin_ser;
pub mod postgres_backend;
pub mod postgres_backend_async;
pub mod pq_proto;
// helper functions for creating and fsyncing
pub mod crashsafe;
@@ -42,9 +39,6 @@ pub mod lock_file;
pub mod accum;
pub mod shutdown;
// Tools for calling certain async methods in sync contexts
pub mod sync;
// Utility for binding TcpListeners with proper socket options.
pub mod tcp_listener;

View File

@@ -3,10 +3,10 @@
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use crate::sock_split::{BidiStream, ReadStream, WriteStream};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::fmt;

View File

@@ -4,9 +4,9 @@
//! is rather narrow, but we can extend it once required.
use crate::postgres_backend::AuthType;
use crate::pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use anyhow::{bail, Context, Result};
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, BeParameterStatusMessage, FeMessage, FeStartupPacket};
use rand::Rng;
use std::future::Future;
use std::net::SocketAddr;

View File

@@ -12,62 +12,61 @@ testing = ["fail/failpoints"]
profiling = ["pprof"]
[dependencies]
amplify_num = { git = "https://github.com/hlinnaka/rust-amplify.git", branch = "unsigned-int-perf" }
anyhow = { version = "1.0", features = ["backtrace"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"
bytes = "1.0.1"
byteorder = "1.4.3"
bytes = "1.0.1"
chrono = "0.4.19"
clap = { version = "4.0", features = ["string"] }
close_fds = "0.3.2"
const_format = "0.2.21"
crc32c = "0.6.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
futures = "0.3.13"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14"
itertools = "0.10.3"
clap = { version = "4.0", features = ["string"] }
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
nix = "0.25"
num-traits = "0.2.15"
once_cell = "1.13.0"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
anyhow = { version = "1.0", features = ["backtrace"] }
crc32c = "0.6.0"
thiserror = "1.0"
tar = "0.4.33"
humantime = "2.1.0"
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
rand = "0.8.3"
regex = "1.4.5"
rstar = "0.9.3"
scopeguard = "1.1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
humantime-serde = "1.1.1"
pprof = { git = "https://github.com/neondatabase/pprof-rs.git", branch = "wallclock-profiling", features = ["flamegraph"], optional = true }
toml_edit = { version = "0.14", features = ["easy"] }
scopeguard = "1.1.0"
const_format = "0.2.21"
tracing = "0.1.36"
signal-hook = "0.3.10"
svg_fmt = "0.4.1"
tar = "0.4.33"
thiserror = "1.0"
tokio = { version = "1.17", features = ["process", "sync", "macros", "fs", "rt", "io-util", "time"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
tokio-util = { version = "0.7.3", features = ["io", "io-util"] }
toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.36"
url = "2"
nix = "0.25"
once_cell = "1.13.0"
crossbeam-utils = "0.8.5"
fail = "0.5.0"
git-version = "0.3.5"
rstar = "0.9.3"
num-traits = "0.2.15"
amplify_num = { git = "https://github.com/hlinnaka/rust-amplify.git", branch = "unsigned-int-perf" }
walkdir = "2.3.2"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
pageserver_api = { path = "../libs/pageserver_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" }
tenant_size_model = { path = "../libs/tenant_size_model" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
close_fds = "0.3.2"
walkdir = "2.3.2"
svg_fmt = "0.4.1"
[dev-dependencies]
criterion = "0.4"

View File

@@ -19,6 +19,7 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::io;
use std::net::TcpListener;
use std::str;
@@ -33,7 +34,6 @@ use utils::{
lsn::Lsn,
postgres_backend::AuthType,
postgres_backend_async::{self, PostgresBackend},
pq_proto::{BeMessage, FeMessage, RowDescriptor},
simple_rcu::RcuReadGuard,
};

View File

@@ -31,8 +31,8 @@ use crate::{
walrecord::DecodedWALRecord,
};
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::id::TenantTimelineId;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback};
use pq_proto::ReplicationFeedback;
use utils::{id::TenantTimelineId, lsn::Lsn};
/// Status of the connection.
#[derive(Debug, Clone)]

View File

@@ -22,11 +22,7 @@ once_cell = "1.13.0"
parking_lot = "0.12"
pin-project-lite = "0.2.7"
rand = "0.8.3"
reqwest = { version = "0.11", default-features = false, features = [
"blocking",
"json",
"rustls-tls",
] }
reqwest = { version = "0.11", default-features = false, features = [ "json", "rustls-tls" ] }
routerify = "3"
rustls = "0.20.0"
rustls-pemfile = "1"
@@ -45,8 +41,9 @@ url = "2.2.2"
uuid = { version = "1.2", features = ["v4", "serde"] }
x509-parser = "0.14"
utils = { path = "../libs/utils" }
metrics = { path = "../libs/metrics" }
pq_proto = { path = "../libs/pq_proto" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]

View File

@@ -1,8 +1,8 @@
use crate::{auth, compute, error::UserFacingError, stream::PqStream, waiters};
use pq_proto::{BeMessage as Be, BeParameterStatusMessage};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span};
use utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage};
#[derive(Debug, Error)]
pub enum LinkAuthError {

View File

@@ -1,10 +1,10 @@
//! User credentials used in authentication.
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::borrow::Cow;
use thiserror::Error;
use tracing::info;
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum ClientCredsParseError {

View File

@@ -2,9 +2,9 @@
use super::{AuthErrorImpl, PasswordHackPayload};
use crate::{sasl, scram, stream::PqStream};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
use std::io;
use tokio::io::{AsyncRead, AsyncWrite};
use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage, BeMessage as Be};
/// Every authentication selector is supposed to implement this trait.
pub trait AuthMethod {

View File

@@ -1,11 +1,11 @@
use anyhow::{anyhow, Context};
use hashbrown::HashMap;
use parking_lot::Mutex;
use pq_proto::CancelKeyData;
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio_postgres::{CancelToken, NoTls};
use tracing::info;
use utils::pq_proto::CancelKeyData;
/// Enables serving `CancelRequest`s.
#[derive(Default)]

View File

@@ -1,12 +1,12 @@
use crate::{cancellation::CancelClosure, error::UserFacingError};
use futures::TryFutureExt;
use itertools::Itertools;
use pq_proto::StartupMessageParams;
use std::{io, net::SocketAddr};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_postgres::NoTls;
use tracing::{error, info};
use utils::pq_proto::StartupMessageParams;
#[derive(Debug, Error)]
pub enum ConnectionError {
@@ -44,7 +44,7 @@ pub type ComputeConnCfg = tokio_postgres::Config;
/// Various compute node info for establishing connection etc.
pub struct NodeInfo {
/// Did we send [`utils::pq_proto::BeMessage::AuthenticationOk`]?
/// Did we send [`pq_proto::BeMessage::AuthenticationOk`]?
pub reported_auth_ok: bool,
/// Compute node connection params.
pub config: tokio_postgres::Config,

View File

@@ -1,15 +1,13 @@
use crate::auth;
use anyhow::Context;
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use serde::Deserialize;
use std::{
net::{TcpListener, TcpStream},
thread,
};
use tracing::{error, info};
use utils::{
postgres_backend::{self, AuthType, PostgresBackend},
pq_proto::{BeMessage, SINGLE_COL_ROWDESC},
};
use utils::postgres_backend::{self, AuthType, PostgresBackend};
/// TODO: move all of that to auth-backend/link.rs when we ditch legacy-console backend

View File

@@ -6,10 +6,10 @@ use anyhow::{bail, Context};
use futures::TryFutureExt;
use metrics::{register_int_counter, IntCounter};
use once_cell::sync::Lazy;
use pq_proto::{BeMessage as Be, *};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, info_span, Instrument};
use utils::pq_proto::{BeMessage as Be, *};
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";

View File

@@ -1,9 +1,9 @@
//! Definitions for SASL messages.
use crate::parse::{split_at_const, split_cstr};
use utils::pq_proto::{BeAuthenticationSaslMessage, BeMessage};
use pq_proto::{BeAuthenticationSaslMessage, BeMessage};
/// SASL-specific payload of [`PasswordMessage`](utils::pq_proto::FeMessage::PasswordMessage).
/// SASL-specific payload of [`PasswordMessage`](pq_proto::FeMessage::PasswordMessage).
#[derive(Debug)]
pub struct FirstMessage<'a> {
/// Authentication method, e.g. `"SCRAM-SHA-256"`.
@@ -31,7 +31,7 @@ impl<'a> FirstMessage<'a> {
/// A single SASL message.
/// This struct is deliberately decoupled from lower-level
/// [`BeAuthenticationSaslMessage`](utils::pq_proto::BeAuthenticationSaslMessage).
/// [`BeAuthenticationSaslMessage`](pq_proto::BeAuthenticationSaslMessage).
#[derive(Debug)]
pub(super) enum ServerMessage<T> {
/// We expect to see more steps.

View File

@@ -2,6 +2,7 @@ use crate::error::UserFacingError;
use anyhow::bail;
use bytes::BytesMut;
use pin_project_lite::pin_project;
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
use rustls::ServerConfig;
use std::pin::Pin;
use std::sync::Arc;
@@ -9,7 +10,6 @@ use std::{io, task};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_rustls::server::TlsStream;
use utils::pq_proto::{BeMessage, FeMessage, FeStartupPacket};
pin_project! {
/// Stream wrapper which implements libpq's protocol.

View File

@@ -4,41 +4,42 @@ version = "0.1.0"
edition = "2021"
[dependencies]
regex = "1.4.5"
bytes = "1.0.1"
byteorder = "1.4.3"
hyper = "0.14"
fs2 = "0.4.3"
serde_json = "1"
tracing = "0.1.27"
clap = "4.0"
nix = "0.25"
tokio = { version = "1.17", features = ["macros", "fs"] }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
anyhow = "1.0"
crc32c = "0.6.0"
humantime = "2.1.0"
url = "2.2.2"
signal-hook = "0.3.10"
serde = { version = "1.0", features = ["derive"] }
serde_with = "2.0"
hex = "0.4.3"
const_format = "0.2.21"
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
git-version = "0.3.5"
async-trait = "0.1"
byteorder = "1.4.3"
bytes = "1.0.1"
clap = "4.0"
const_format = "0.2.21"
crc32c = "0.6.0"
fs2 = "0.4.3"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
hyper = "0.14"
nix = "0.25"
once_cell = "1.13.0"
toml_edit = { version = "0.14", features = ["easy"] }
thiserror = "1"
parking_lot = "0.12.1"
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
regex = "1.4.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
signal-hook = "0.3.10"
thiserror = "1"
tokio = { version = "1.17", features = ["macros", "fs"] }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.27"
url = "2.2.2"
safekeeper_api = { path = "../libs/safekeeper_api" }
postgres_ffi = { path = "../libs/postgres_ffi" }
metrics = { path = "../libs/metrics" }
utils = { path = "../libs/utils" }
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" }
safekeeper_api = { path = "../libs/safekeeper_api" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }
[dev-dependencies]

View File

@@ -4,13 +4,13 @@ use crate::safekeeper::{
TermSwitchEntry,
};
use anyhow::{bail, Result};
use pq_proto::SystemId;
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::{
bin_ser::LeSer,
id::{TenantId, TimelineId},
lsn::Lsn,
pq_proto::SystemId,
};
/// Persistent consensus state of the acceptor.

View File

@@ -12,12 +12,12 @@ use anyhow::{bail, Context, Result};
use postgres_ffi::PG_TLI;
use regex::Regex;
use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
use tracing::info;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
postgres_backend::{self, PostgresBackend},
pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID},
};
/// Safekeeper handler of postgres commands

View File

@@ -24,11 +24,8 @@ use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{
lsn::Lsn,
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, RowDescriptor, TEXT_OID},
};
use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
use utils::{lsn::Lsn, postgres_backend::PostgresBackend};
#[derive(Serialize, Deserialize, Debug)]
pub struct AppendLogicalMessage {

View File

@@ -383,7 +383,7 @@ impl Collector for TimelineCollector {
let timeline_id = tli.ttid.timeline_id.to_string();
let labels = &[tenant_id.as_str(), timeline_id.as_str()];
let mut most_advanced: Option<utils::pq_proto::ReplicationFeedback> = None;
let mut most_advanced: Option<pq_proto::ReplicationFeedback> = None;
for replica in tli.replicas.iter() {
if let Some(replica_feedback) = replica.pageserver_feedback {
if let Some(current) = most_advanced {

View File

@@ -23,11 +23,8 @@ use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
use crate::handler::SafekeeperPostgresHandler;
use utils::{
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage},
sock_split::ReadStream,
};
use pq_proto::{BeMessage, FeMessage};
use utils::{postgres_backend::PostgresBackend, sock_split::ReadStream};
pub struct ReceiveWalConn<'pg> {
/// Postgres connection

View File

@@ -18,11 +18,11 @@ use crate::control_file;
use crate::send_wal::HotStandbyFeedback;
use crate::wal_storage;
use pq_proto::{ReplicationFeedback, SystemId};
use utils::{
bin_ser::LeSer,
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
pq_proto::{ReplicationFeedback, SystemId},
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;

View File

@@ -17,16 +17,11 @@ use std::sync::Arc;
use std::time::Duration;
use std::{str, thread};
use pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody};
use tokio::sync::watch::Receiver;
use tokio::time::timeout;
use tracing::*;
use utils::{
bin_ser::BeSer,
lsn::Lsn,
postgres_backend::PostgresBackend,
pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody},
sock_split::ReadStream,
};
use utils::{bin_ser::BeSer, lsn::Lsn, postgres_backend::PostgresBackend, sock_split::ReadStream};
// See: https://www.postgresql.org/docs/13/protocol-replication.html
const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';

View File

@@ -2,26 +2,20 @@
//! to glue together SafeKeeper and all other background services.
use anyhow::{bail, Result};
use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::XLogSegNo;
use tokio::{sync::watch, time::Instant};
use std::cmp::{max, min};
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo;
use pq_proto::ReplicationFeedback;
use std::cmp::{max, min};
use std::path::PathBuf;
use tokio::sync::mpsc::Sender;
use tokio::{
sync::{mpsc::Sender, watch},
time::Instant,
};
use tracing::*;
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
pq_proto::ReplicationFeedback,
};
use crate::safekeeper::{

View File

@@ -21,6 +21,9 @@ clap = { version = "4", features = ["color", "error-context", "help", "std", "st
crossbeam-utils = { version = "0.8", features = ["once_cell", "std"] }
either = { version = "1", features = ["use_std"] }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["alloc", "futures-sink", "sink", "std"] }
futures-task = { version = "0.3", default-features = false, features = ["alloc", "std"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "sink", "slab", "std"] }
hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
libc = { version = "0.2", features = ["extra_traits", "std"] }
@@ -34,6 +37,7 @@ prost = { version = "0.10", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "blocking", "hyper-rustls", "json", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-rustls", "webpki-roots"] }
scopeguard = { version = "1", features = ["use_std"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
stable_deref_trait = { version = "1", features = ["alloc", "std"] }