mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
include lots of changes that went missing by accident
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4340,6 +4340,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"utils",
|
||||
@@ -4526,6 +4527,7 @@ name = "pageserver_client_grpc"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"pageserver_data_api",
|
||||
"thiserror 1.0.69",
|
||||
|
||||
@@ -5,6 +5,7 @@ edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
thiserror.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
@@ -7,9 +7,8 @@ use std::collections::HashMap;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use bytes::Bytes;
|
||||
use http;
|
||||
use futures::Stream;
|
||||
use thiserror::Error;
|
||||
use tonic;
|
||||
use tonic::metadata::AsciiMetadataValue;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
@@ -108,6 +107,21 @@ impl PageserverClient {
|
||||
Ok(response.into_inner().page_image)
|
||||
}
|
||||
|
||||
pub async fn get_pages(
|
||||
&self,
|
||||
requests: impl Stream<Item = proto::GetPageRequest> + Send + 'static,
|
||||
) -> std::result::Result<
|
||||
tonic::Response<tonic::codec::Streaming<proto::GetPageResponse>>,
|
||||
PageserverClientError,
|
||||
> {
|
||||
// FIXME: calculate the shard number correctly
|
||||
let shard_no = 0;
|
||||
|
||||
let mut client = self.get_client(shard_no).await?;
|
||||
|
||||
Ok(client.get_pages(tonic::Request::new(requests)).await?)
|
||||
}
|
||||
|
||||
/// Process a request to get the size of a database.
|
||||
pub async fn process_dbsize_request(
|
||||
&self,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Generate rust code from .proto protobuf.
|
||||
tonic_build::configure()
|
||||
.bytes(&["."])
|
||||
.bytes(["."])
|
||||
.compile_protos(&["proto/page_service.proto"], &["proto"])
|
||||
.unwrap_or_else(|e| panic!("failed to compile protos {:?}", e));
|
||||
Ok(())
|
||||
|
||||
@@ -17,8 +17,12 @@ service PageService {
|
||||
// Returns size of a relation, as # of blocks
|
||||
rpc RelSize (RelSizeRequest) returns (RelSizeResponse);
|
||||
|
||||
// Fetches a page.
|
||||
rpc GetPage (GetPageRequest) returns (GetPageResponse);
|
||||
|
||||
// Streaming GetPage protocol.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns total size of a database, as # of bytes
|
||||
rpc DbSize (DbSizeRequest) returns (DbSizeResponse);
|
||||
|
||||
|
||||
@@ -15,11 +15,12 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
|
||||
@@ -9,7 +9,6 @@ use anyhow::Context;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client::mgmt_api::ForceAwaitLogicalSize;
|
||||
use pageserver_client::page_service::BasebackupRequest;
|
||||
use pageserver_client_grpc;
|
||||
use pageserver_data_api::model::{GetBaseBackupRequest, RequestCommon};
|
||||
|
||||
use rand::prelude::*;
|
||||
|
||||
@@ -29,6 +29,8 @@ use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
pub(crate) struct Args {
|
||||
#[clap(long, default_value = "false")]
|
||||
grpc: bool,
|
||||
#[clap(long, default_value = "false")]
|
||||
grpc_stream: bool,
|
||||
#[clap(long, default_value = "http://localhost:9898")]
|
||||
mgmt_api_endpoint: String,
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
@@ -299,7 +301,18 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
if args.grpc {
|
||||
if args.grpc_stream {
|
||||
client_grpc_stream(
|
||||
args,
|
||||
worker_id,
|
||||
ss,
|
||||
cancel,
|
||||
rps_period,
|
||||
ranges,
|
||||
weights,
|
||||
)
|
||||
.await
|
||||
} else if args.grpc {
|
||||
client_grpc(
|
||||
args,
|
||||
worker_id,
|
||||
@@ -461,6 +474,7 @@ async fn client_libpq(
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn client_grpc(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
@@ -557,3 +571,103 @@ async fn client_grpc(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn client_grpc_stream(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let shard_map = HashMap::from([(0, args.page_service_connstring.clone())]);
|
||||
let client = pageserver_client_grpc::PageserverClient::new(
|
||||
&worker_id.timeline.tenant_id.to_string(),
|
||||
&worker_id.timeline.timeline_id.to_string(),
|
||||
&None,
|
||||
shard_map,
|
||||
);
|
||||
|
||||
let (request_tx, request_rx) = tokio::sync::mpsc::channel(1);
|
||||
let request_stream = tokio_stream::wrappers::ReceiverStream::new(request_rx);
|
||||
let mut response_stream = client.get_pages(request_stream).await.unwrap().into_inner();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
shared_state.live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
// Send requests until the queue depth is reached
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
pageserver_data_api::model::GetPageRequest {
|
||||
common: pageserver_data_api::model::RequestCommon {
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since_lsn: r.timeline_lsn,
|
||||
},
|
||||
rel: pageserver_data_api::model::RelTag {
|
||||
spc_oid: rel_tag.spcnode,
|
||||
db_oid: rel_tag.dbnode,
|
||||
rel_number: rel_tag.relnode,
|
||||
fork_number: rel_tag.forknum,
|
||||
},
|
||||
block_number: block_no,
|
||||
}
|
||||
};
|
||||
request_tx.send((&req).into()).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
// Receive responses for the inflight requests
|
||||
if let Some(response) = response_stream.next().await {
|
||||
response.unwrap(); // Ensure the response is successful
|
||||
let start = inflight.pop_front().unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
|
||||
// Enforce RPS limit if specified
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,8 @@ use crate::tenant::mgr::ShardResolveResult;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::WaitLsnTimeout;
|
||||
use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use tokio::io::{AsyncWriteExt, ReadHalf, SimplexStream};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::codec::{Decoder, FramedRead};
|
||||
@@ -47,10 +49,11 @@ use bytes::BytesMut;
|
||||
use jsonwebtoken::TokenData;
|
||||
use tracing::Instrument;
|
||||
use tracing::{debug, error};
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
use utils::auth::{Claims, SwappableJwtAuth};
|
||||
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::shard::ShardIndex;
|
||||
use utils::simple_rcu::RcuReadGuard;
|
||||
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -144,6 +147,8 @@ fn convert_reltag(value: &model::RelTag) -> pageserver_api::reltag::RelTag {
|
||||
#[tonic::async_trait]
|
||||
impl PageService for PageServiceService {
|
||||
type GetBaseBackupStream = GetBaseBackupStream;
|
||||
type GetPagesStream =
|
||||
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
|
||||
|
||||
async fn rel_exists(
|
||||
&self,
|
||||
@@ -258,14 +263,64 @@ impl PageService for PageServiceService {
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(tonic::Response::new(proto::GetPageResponse {
|
||||
page_image: page_image,
|
||||
}))
|
||||
Ok(tonic::Response::new(proto::GetPageResponse { page_image }))
|
||||
}
|
||||
.instrument(span)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_pages(
|
||||
&self,
|
||||
request: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
|
||||
// TODO: pass the shard index in the request metadata.
|
||||
let ttid = self.extract_ttid(request.metadata())?;
|
||||
let timeline = self
|
||||
.get_timeline(ttid, ShardSelector::Known(ShardIndex::unsharded()))
|
||||
.await?;
|
||||
let ctx = self.ctx.with_scope_timeline(&timeline);
|
||||
let conf = self.conf;
|
||||
|
||||
let mut request_stream = request.into_inner();
|
||||
|
||||
let response_stream = try_stream! {
|
||||
while let Some(request) = request_stream.message().await? {
|
||||
let guard = timeline
|
||||
.gate
|
||||
.enter()
|
||||
.or(Err(tonic::Status::unavailable("timeline is shutting down")))?;
|
||||
|
||||
let request: model::GetPageRequest = (&request).try_into()?;
|
||||
let rel = convert_reltag(&request.rel);
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
&timeline,
|
||||
request.common.request_lsn,
|
||||
request.common.not_modified_since_lsn,
|
||||
&latest_gc_cutoff_lsn,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let page_image = timeline
|
||||
.get_rel_page_at_lsn(
|
||||
rel,
|
||||
request.block_number,
|
||||
Version::Lsn(lsn),
|
||||
&ctx,
|
||||
IoConcurrency::spawn_from_conf(conf, guard),
|
||||
)
|
||||
.await?;
|
||||
|
||||
yield proto::GetPageResponse { page_image };
|
||||
}
|
||||
};
|
||||
|
||||
Ok(tonic::Response::new(
|
||||
Box::pin(response_stream) as Self::GetPagesStream
|
||||
))
|
||||
}
|
||||
|
||||
async fn db_size(
|
||||
&self,
|
||||
request: tonic::Request<proto::DbSizeRequest>,
|
||||
@@ -641,7 +696,7 @@ impl tonic::service::Interceptor for PageServiceAuthenticator {
|
||||
let jwtdata: TokenData<utils::auth::Claims> = auth
|
||||
.decode(jwt)
|
||||
.map_err(|err| tonic::Status::unauthenticated(format!("invalid JWT token: {}", err)))?;
|
||||
let claims = jwtdata.claims;
|
||||
let claims: Claims = jwtdata.claims;
|
||||
|
||||
if matches!(claims.scope, utils::auth::Scope::Tenant) && claims.tenant_id.is_none() {
|
||||
return Err(tonic::Status::unauthenticated(
|
||||
@@ -669,7 +724,6 @@ impl tonic::service::Interceptor for PageServiceAuthenticator {
|
||||
///
|
||||
/// The first part of the Chain chunks the tarball. The second part checks the return value
|
||||
/// of the send_basebackup_tarball Future that created the tarball.
|
||||
|
||||
type GetBaseBackupStream = futures::stream::Chain<BasebackupChunkedStream, CheckResultStream>;
|
||||
|
||||
fn new_basebackup_response_stream(
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use cbindgen;
|
||||
|
||||
use std::env;
|
||||
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
@@ -120,7 +120,7 @@ pub extern "C" fn bcomm_start_get_page_v_request<'t>(
|
||||
// Tell the communicator about it
|
||||
bs.submit_request(request_idx);
|
||||
|
||||
return request_idx;
|
||||
request_idx
|
||||
}
|
||||
|
||||
/// Check if a request has completed. Returns:
|
||||
|
||||
@@ -116,7 +116,7 @@ struct EventBuilder<'a> {
|
||||
maker: &'a Maker,
|
||||
}
|
||||
|
||||
impl<'a> std::io::Write for EventBuilder<'a> {
|
||||
impl std::io::Write for EventBuilder<'_> {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.event.message.write(buf)
|
||||
}
|
||||
@@ -126,7 +126,7 @@ impl<'a> std::io::Write for EventBuilder<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for EventBuilder<'a> {
|
||||
impl Drop for EventBuilder<'_> {
|
||||
fn drop(&mut self) {
|
||||
let maker = self.maker;
|
||||
let event = std::mem::take(&mut self.event);
|
||||
|
||||
@@ -29,14 +29,7 @@ pub extern "C" fn communicator_worker_process_launch(
|
||||
// Convert the arguments into more convenient Rust types
|
||||
let tenant_id = unsafe { CStr::from_ptr(tenant_id) }.to_str().unwrap();
|
||||
let timeline_id = unsafe { CStr::from_ptr(timeline_id) }.to_str().unwrap();
|
||||
let auth_token = {
|
||||
if auth_token.is_null() {
|
||||
None
|
||||
} else {
|
||||
let c_str = unsafe { CStr::from_ptr(auth_token) };
|
||||
Some(c_str.to_str().unwrap().to_string())
|
||||
}
|
||||
};
|
||||
let auth_token = unsafe { auth_token.as_ref() }.map(|s| s.to_string());
|
||||
let file_cache_path = {
|
||||
if file_cache_path.is_null() {
|
||||
None
|
||||
|
||||
Reference in New Issue
Block a user