mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
4 Commits
release-pr
...
elizabeth/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f076d51643 | ||
|
|
bfc4f5162f | ||
|
|
e4618c11c9 | ||
|
|
df32cc153c |
85
Cargo.lock
generated
85
Cargo.lock
generated
@@ -701,7 +701,7 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"itoa",
|
||||
"matchit",
|
||||
@@ -2330,7 +2330,7 @@ dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"pin-project",
|
||||
"rand 0.8.5",
|
||||
@@ -2883,9 +2883,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "httparse"
|
||||
version = "1.8.0"
|
||||
version = "1.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
|
||||
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
|
||||
|
||||
[[package]]
|
||||
name = "httpdate"
|
||||
@@ -2935,9 +2935,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.4.1"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
|
||||
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
@@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"rustls 0.22.4",
|
||||
"rustls-pki-types",
|
||||
@@ -2992,7 +2992,7 @@ version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
|
||||
dependencies = [
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
@@ -3001,20 +3001,20 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hyper-util"
|
||||
version = "0.1.7"
|
||||
version = "0.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
|
||||
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"libc",
|
||||
"pin-project-lite",
|
||||
"socket2",
|
||||
"tokio",
|
||||
"tower 0.4.13",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
@@ -4236,6 +4236,8 @@ name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"camino",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -4244,10 +4246,13 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_client_grpc",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
@@ -4432,6 +4437,29 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_client_grpc"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"metrics",
|
||||
"pageserver_page_api",
|
||||
"priority-queue",
|
||||
"rand 0.8.5",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tower 0.4.13",
|
||||
"tracing",
|
||||
"utils",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_compaction"
|
||||
version = "0.1.0"
|
||||
@@ -5008,6 +5036,17 @@ dependencies = [
|
||||
"elliptic-curve 0.13.8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "priority-queue"
|
||||
version = "2.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ef08705fa1589a1a59aa924ad77d14722cb0cd97b67dd5004ed5f4a4873fce8d"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"equivalent",
|
||||
"indexmap 2.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2"
|
||||
version = "1.0.94"
|
||||
@@ -5208,7 +5247,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"humantime-serde",
|
||||
"hyper 0.14.30",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"ipnet",
|
||||
@@ -5604,7 +5643,7 @@ dependencies = [
|
||||
"http-body-util",
|
||||
"http-types",
|
||||
"humantime-serde",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"itertools 0.10.5",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
@@ -5644,7 +5683,7 @@ dependencies = [
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-rustls 0.26.0",
|
||||
"hyper-util",
|
||||
"ipnet",
|
||||
@@ -5701,7 +5740,7 @@ dependencies = [
|
||||
"futures",
|
||||
"getrandom 0.2.11",
|
||||
"http 1.1.0",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"parking_lot 0.11.2",
|
||||
"reqwest",
|
||||
"reqwest-middleware",
|
||||
@@ -6642,12 +6681,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "socket2"
|
||||
version = "0.5.5"
|
||||
version = "0.5.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
|
||||
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.48.0",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6713,7 +6752,7 @@ dependencies = [
|
||||
"http-body-util",
|
||||
"http-utils",
|
||||
"humantime",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"metrics",
|
||||
"once_cell",
|
||||
@@ -7538,11 +7577,12 @@ dependencies = [
|
||||
"axum",
|
||||
"base64 0.22.1",
|
||||
"bytes",
|
||||
"flate2",
|
||||
"h2 0.4.4",
|
||||
"http 1.1.0",
|
||||
"http-body 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-timeout",
|
||||
"hyper-util",
|
||||
"percent-encoding",
|
||||
@@ -7599,6 +7639,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8591,7 +8632,7 @@ dependencies = [
|
||||
"hex",
|
||||
"hmac",
|
||||
"hyper 0.14.30",
|
||||
"hyper 1.4.1",
|
||||
"hyper 1.6.0",
|
||||
"hyper-util",
|
||||
"indexmap 2.9.0",
|
||||
"itertools 0.12.1",
|
||||
|
||||
@@ -8,6 +8,7 @@ members = [
|
||||
"pageserver/compaction",
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/client_grpc",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"proxy",
|
||||
@@ -199,7 +200,7 @@ tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
tonic = { version = "0.13.1", default-features = false, features = ["gzip", "channel", "codegen", "prost", "router", "server", "tls-ring", "tls-native-roots"] }
|
||||
tonic-reflection = { version = "0.13.1", features = ["server"] }
|
||||
tower = { version = "0.5.2", default-features = false }
|
||||
tower-http = { version = "0.6.2", features = ["auth", "request-id", "trace"] }
|
||||
@@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" }
|
||||
pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
|
||||
25
pageserver/client_grpc/Cargo.toml
Normal file
25
pageserver/client_grpc/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "pageserver_client_grpc"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
thiserror.workspace = true
|
||||
tonic.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio = { version = "1.43.1", features = ["full", "macros", "net", "io-util", "rt", "rt-multi-thread"] }
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
tower = { version = "0.4", features = ["timeout", "util"] }
|
||||
rand = "0.8"
|
||||
tokio-util = { version = "0.7", features = ["compat"] }
|
||||
hyper-util = "0.1.9"
|
||||
hyper = "1.6.0"
|
||||
metrics.workspace = true
|
||||
priority-queue = "2.3.1"
|
||||
|
||||
|
||||
pageserver_page_api.workspace = true
|
||||
utils.workspace = true
|
||||
193
pageserver/client_grpc/src/lib.rs
Normal file
193
pageserver/client_grpc/src/lib.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
//
|
||||
// Pageserver gRPC client library
|
||||
//
|
||||
// This library provides a gRPC client for the pageserver for the
|
||||
// communicator project.
|
||||
//
|
||||
// This library is a work in progress.
|
||||
//
|
||||
// TODO: This should properly use the shard map
|
||||
//
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{StreamExt};
|
||||
use thiserror::Error;
|
||||
use tonic::metadata::AsciiMetadataValue;
|
||||
|
||||
use pageserver_page_api::model::*;
|
||||
use pageserver_page_api::proto;
|
||||
|
||||
use pageserver_page_api::proto::PageServiceClient;
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use tracing::error;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
#[derive(Error, Debug)]
|
||||
pub enum PageserverClientError {
|
||||
#[error("could not connect to service: {0}")]
|
||||
ConnectError(#[from] tonic::transport::Error),
|
||||
#[error("could not perform request: {0}`")]
|
||||
RequestError(#[from] tonic::Status),
|
||||
#[error("protocol error: {0}")]
|
||||
ProtocolError(#[from] ProtocolError),
|
||||
#[error("could not perform request: {0}`")]
|
||||
InvalidUri(#[from] http::uri::InvalidUri),
|
||||
#[error("could not perform request: {0}`")]
|
||||
Other(String),
|
||||
}
|
||||
|
||||
pub struct PageserverClient {
|
||||
_tenant_id: String,
|
||||
_timeline_id: String,
|
||||
_auth_token: Option<String>,
|
||||
shard_map: HashMap<ShardIndex, String>,
|
||||
channels: tokio::sync::RwLock<HashMap<ShardIndex, Channel>>,
|
||||
auth_interceptor: AuthInterceptor,
|
||||
}
|
||||
|
||||
impl PageserverClient {
|
||||
/// TODO: this doesn't currently react to changes in the shard map.
|
||||
pub fn new(
|
||||
tenant_id: &str,
|
||||
timeline_id: &str,
|
||||
auth_token: &Option<String>,
|
||||
shard_map: HashMap<ShardIndex, String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
_tenant_id: tenant_id.to_string(),
|
||||
_timeline_id: timeline_id.to_string(),
|
||||
_auth_token: auth_token.clone(),
|
||||
shard_map,
|
||||
channels: RwLock::new(HashMap::new()),
|
||||
auth_interceptor: AuthInterceptor::new(tenant_id, timeline_id, auth_token.as_deref()),
|
||||
}
|
||||
}
|
||||
//
|
||||
// TODO: This opens a new gRPC stream for every request, which is extremely inefficient
|
||||
pub async fn get_page(
|
||||
&self,
|
||||
request: &GetPageRequest,
|
||||
) -> Result<Vec<Bytes>, PageserverClientError> {
|
||||
// FIXME: calculate the shard number correctly
|
||||
let shard = ShardIndex::unsharded();
|
||||
let chan = self.get_client(shard).await;
|
||||
|
||||
let mut client =
|
||||
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
|
||||
|
||||
let request = proto::GetPageRequest::from(request);
|
||||
let request_stream = futures::stream::once(std::future::ready(request));
|
||||
|
||||
let mut response_stream = client
|
||||
.get_pages(tonic::Request::new(request_stream))
|
||||
.await?
|
||||
.into_inner();
|
||||
|
||||
let Some(response) = response_stream.next().await else {
|
||||
return Err(PageserverClientError::Other(
|
||||
"no response received for getpage request".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
match response {
|
||||
Err(status) => {
|
||||
return Err(PageserverClientError::RequestError(status));
|
||||
}
|
||||
Ok(resp) => {
|
||||
let response: GetPageResponse = resp.try_into().unwrap();
|
||||
return Ok(response.page_images.to_vec());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// TODO: this should use a connection pool with concurrency limits,
|
||||
// not a single connection to the shard.
|
||||
//
|
||||
async fn get_client(&self, shard: ShardIndex) -> Channel {
|
||||
// Get channel from the hashmap
|
||||
let mut channels = self.channels.write();
|
||||
if let Some(channel) = channels.await.get(&shard) {
|
||||
return channel.clone();
|
||||
}
|
||||
// Create a new channel if it doesn't exist
|
||||
let shard_url = self
|
||||
.shard_map
|
||||
.get(&shard)
|
||||
.expect("shard not found in shard map");
|
||||
|
||||
let attempt = Endpoint::from_shared(shard_url.clone())
|
||||
.expect("invalid endpoint")
|
||||
.connect()
|
||||
.await;
|
||||
|
||||
match attempt {
|
||||
Ok(channel) => {
|
||||
channels = self.channels.write();
|
||||
channels.await.insert(shard, channel.clone());
|
||||
channel.clone()
|
||||
}
|
||||
Err(e) => {
|
||||
// TODO: handle this more gracefully, e.g. with a connection pool retry
|
||||
panic!("Failed to connect to shard {shard}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
|
||||
#[derive(Clone)]
|
||||
struct AuthInterceptor {
|
||||
tenant_id: AsciiMetadataValue,
|
||||
shard_id: Option<AsciiMetadataValue>,
|
||||
timeline_id: AsciiMetadataValue,
|
||||
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
|
||||
}
|
||||
|
||||
impl AuthInterceptor {
|
||||
fn new(tenant_id: &str, timeline_id: &str, auth_token: Option<&str>) -> Self {
|
||||
Self {
|
||||
tenant_id: tenant_id.parse().expect("could not parse tenant id"),
|
||||
shard_id: None,
|
||||
timeline_id: timeline_id.parse().expect("could not parse timeline id"),
|
||||
auth_header: auth_token
|
||||
.map(|t| format!("Bearer {t}"))
|
||||
.map(|t| t.parse().expect("could not parse auth token")),
|
||||
}
|
||||
}
|
||||
|
||||
fn for_shard(&self, shard_id: ShardIndex) -> Self {
|
||||
let mut with_shard = self.clone();
|
||||
with_shard.shard_id = Some(
|
||||
shard_id
|
||||
.to_string()
|
||||
.parse()
|
||||
.expect("could not parse shard id"),
|
||||
);
|
||||
with_shard
|
||||
}
|
||||
}
|
||||
|
||||
impl tonic::service::Interceptor for AuthInterceptor {
|
||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
req.metadata_mut()
|
||||
.insert("neon-tenant-id", self.tenant_id.clone());
|
||||
if let Some(shard_id) = &self.shard_id {
|
||||
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
|
||||
}
|
||||
req.metadata_mut()
|
||||
.insert("neon-timeline-id", self.timeline_id.clone());
|
||||
if let Some(auth_header) = &self.auth_header {
|
||||
req.metadata_mut()
|
||||
.insert("authorization", auth_header.clone());
|
||||
}
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
@@ -18,6 +18,6 @@ pub mod proto {
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
|
||||
mod model;
|
||||
pub mod model;
|
||||
|
||||
pub use model::*;
|
||||
|
||||
@@ -102,6 +102,15 @@ impl TryFrom<ReadLsn> for proto::ReadLsn {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&ReadLsn> for proto::ReadLsn {
|
||||
fn from(value: &ReadLsn) -> proto::ReadLsn {
|
||||
proto::ReadLsn {
|
||||
request_lsn: value.request_lsn.into(),
|
||||
not_modified_since_lsn: value.not_modified_since_lsn.unwrap_or_default().0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// RelTag is defined in pageserver_api::reltag.
|
||||
pub type RelTag = pageserver_api::reltag::RelTag;
|
||||
|
||||
@@ -132,6 +141,16 @@ impl From<RelTag> for proto::RelTag {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&RelTag> for proto::RelTag {
|
||||
fn from(value: &RelTag) -> proto::RelTag {
|
||||
proto::RelTag {
|
||||
spc_oid: value.spcnode,
|
||||
db_oid: value.dbnode,
|
||||
rel_number: value.relnode,
|
||||
fork_number: value.forknum as u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
/// Checks whether a relation exists, at the given LSN. Only valid on shard 0, other shards error.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct CheckRelExistsRequest {
|
||||
@@ -311,6 +330,17 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&GetPageRequest> for proto::GetPageRequest {
|
||||
fn from(request: &GetPageRequest) -> proto::GetPageRequest {
|
||||
proto::GetPageRequest {
|
||||
request_id: request.request_id,
|
||||
request_class: request.request_class.into(),
|
||||
read_lsn: Some(request.read_lsn.try_into().unwrap()),
|
||||
rel: Some(request.rel.into()),
|
||||
block_number: request.block_numbers.clone().into_vec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl TryFrom<GetPageRequest> for proto::GetPageRequest {
|
||||
type Error = ProtocolError;
|
||||
|
||||
|
||||
@@ -10,19 +10,24 @@ license.workspace = true
|
||||
anyhow.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
thiserror.workspace = true
|
||||
futures.workspace = true
|
||||
hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
bytes.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
async-trait = "0.1"
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_client_grpc.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -11,20 +11,32 @@ use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_page_api::model::{GetPageClass};
|
||||
use pageserver_client::page_service::PagestreamClient;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use utils::id::TenantTimelineId;
|
||||
use utils::id::TenantId;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
|
||||
|
||||
use utils::shard::ShardIndex;
|
||||
use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt, StreamExt};
|
||||
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
|
||||
#[clap(long, default_value = "false")]
|
||||
grpc: bool,
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
@@ -118,6 +130,7 @@ struct Output {
|
||||
total: request_stats::Output,
|
||||
}
|
||||
|
||||
|
||||
tokio_thread_local_stats::declare!(STATS: request_stats::Stats);
|
||||
|
||||
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
|
||||
@@ -303,7 +316,20 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
if args.grpc {
|
||||
let grpc = GrpcProtocol::new(
|
||||
args.page_service_connstring.clone(),
|
||||
worker_id.timeline.tenant_id,
|
||||
worker_id.timeline.timeline_id).await;
|
||||
client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
} else {
|
||||
let pg = PgProtocol::new(
|
||||
args.page_service_connstring.clone(),
|
||||
worker_id.timeline.tenant_id,
|
||||
worker_id.timeline.timeline_id).await;
|
||||
client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
}
|
||||
|
||||
})
|
||||
};
|
||||
|
||||
@@ -354,9 +380,212 @@ async fn main_impl(
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
// src/protocol.rs
|
||||
use async_trait::async_trait;
|
||||
use rand::distributions::weighted::WeightedIndex;
|
||||
|
||||
async fn client_libpq(
|
||||
// — your existing imports for PagestreamClient, PageserverClientError, KeyRange, etc. —
|
||||
|
||||
/// Common interface for both Pg and Grpc versions.
|
||||
#[async_trait]
|
||||
trait Protocol: Send {
|
||||
/// Constructor/factory.
|
||||
async fn new(
|
||||
conn_string: String,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Self
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Fire off a “get page” request and store the start time.
|
||||
async fn add_to_inflight(
|
||||
&mut self,
|
||||
start: Instant,
|
||||
args: &Args,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: WeightedIndex<i128>,
|
||||
);
|
||||
|
||||
/// Wait for the next response and return its start time.
|
||||
async fn get_start_time(&mut self) -> Instant;
|
||||
|
||||
/// How many in-flight requests do we have?
|
||||
fn len(&self) -> usize;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// PgProtocol
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct PgProtocol {
|
||||
libpq_pagestream: PagestreamClient,
|
||||
libpq_vector: VecDeque<Instant>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Protocol for PgProtocol {
|
||||
async fn new(
|
||||
conn_string: String,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Self {
|
||||
let client = pageserver_client::page_service::Client::new(conn_string)
|
||||
.await
|
||||
.unwrap()
|
||||
.pagestream(tenant_id, timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
Self {
|
||||
libpq_pagestream: client,
|
||||
libpq_vector: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_to_inflight(
|
||||
&mut self,
|
||||
start: Instant,
|
||||
args: &Args,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: WeightedIndex<i128>,
|
||||
) {
|
||||
// build your PagestreamGetPageRequest exactly as before…
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key.to_rel_block().unwrap();
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
|
||||
let _ = self.libpq_pagestream.getpage_send(req).await;
|
||||
self.libpq_vector.push_back(start);
|
||||
}
|
||||
|
||||
async fn get_start_time(&mut self) -> Instant {
|
||||
let start = self.libpq_vector.pop_front().unwrap();
|
||||
let _ = self.libpq_pagestream.getpage_recv().await;
|
||||
start
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.libpq_vector.len()
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// GrpcProtocol
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
type GetPageFut = BoxFuture<'static, (Instant, Option<pageserver_client_grpc::PageserverClientError>)>;
|
||||
struct GrpcProtocol {
|
||||
grpc_page_client: Arc<pageserver_client_grpc::PageserverClient>,
|
||||
grpc_vector: FuturesOrdered<GetPageFut>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Protocol for GrpcProtocol {
|
||||
async fn new(
|
||||
conn_string: String,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Self {
|
||||
let shard_map = std::collections::HashMap::from([(
|
||||
ShardIndex::unsharded(),
|
||||
conn_string.clone(),
|
||||
)]);
|
||||
let client = pageserver_client_grpc::PageserverClient::new(
|
||||
&tenant_id.to_string(),
|
||||
&timeline_id.to_string(),
|
||||
&None,
|
||||
shard_map,
|
||||
);
|
||||
Self {
|
||||
grpc_page_client: Arc::new(client),
|
||||
grpc_vector: FuturesOrdered::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn add_to_inflight(
|
||||
&mut self,
|
||||
start: Instant,
|
||||
args: &Args,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: WeightedIndex<i128>,
|
||||
) {
|
||||
// build your GetPageRequest exactly as before…
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key.to_rel_block().unwrap();
|
||||
pageserver_page_api::model::GetPageRequest {
|
||||
request_id: 0,
|
||||
request_class: GetPageClass::Normal,
|
||||
read_lsn: pageserver_page_api::model::ReadLsn {
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since_lsn: Some(r.timeline_lsn),
|
||||
},
|
||||
rel: pageserver_page_api::model::RelTag {
|
||||
spcnode: rel_tag.spcnode,
|
||||
dbnode: rel_tag.dbnode,
|
||||
relnode: rel_tag.relnode,
|
||||
forknum: rel_tag.forknum,
|
||||
},
|
||||
block_numbers: vec![block_no].into(),
|
||||
}
|
||||
};
|
||||
|
||||
let client_clone = self.grpc_page_client.clone();
|
||||
let getpage_fut : GetPageFut = async move {
|
||||
let result = client_clone.get_page(&req).await;
|
||||
match result {
|
||||
Ok(_) => {
|
||||
(start, None)
|
||||
}
|
||||
Err(e) => {
|
||||
(start, Some(e))
|
||||
}
|
||||
}
|
||||
}.boxed();
|
||||
self.grpc_vector.push_back(getpage_fut);
|
||||
}
|
||||
|
||||
async fn get_start_time(&mut self) -> Instant {
|
||||
let (start, err) = self.grpc_vector.next().await.unwrap();
|
||||
if let Some(e) = err {
|
||||
tracing::error!("getpage request failed: {e}");
|
||||
}
|
||||
start
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
self.grpc_vector.len()
|
||||
}
|
||||
}
|
||||
|
||||
async fn client_proto(
|
||||
args: &Args,
|
||||
mut protocol: impl Protocol,
|
||||
worker_id: WorkerId,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
@@ -364,18 +593,11 @@ async fn client_libpq(
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
@@ -390,37 +612,12 @@ async fn client_libpq(
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
while protocol.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await;
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let start = protocol.get_start_time().await;
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
|
||||
Reference in New Issue
Block a user