mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 03:00:37 +00:00
Compare commits
7 Commits
release-84
...
diko/test_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8b8c2bfeae | ||
|
|
15656082d8 | ||
|
|
4d0c1e8b78 | ||
|
|
3158442a59 | ||
|
|
f006879fb7 | ||
|
|
723c9b5ba2 | ||
|
|
a0d844dfed |
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4285,6 +4285,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_compaction",
|
||||
"pem",
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -6001,6 +6002,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"pageserver_api",
|
||||
"parking_lot 0.12.1",
|
||||
"pem",
|
||||
"postgres-protocol",
|
||||
"postgres_backend",
|
||||
"postgres_ffi",
|
||||
|
||||
@@ -78,6 +78,7 @@ metrics.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true # for ResponseErrorMessageExt TOOD refactor that
|
||||
pageserver_compaction.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
|
||||
@@ -68,6 +68,13 @@ pub(crate) struct Args {
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
/// State shared by all clients
|
||||
#[derive(Debug)]
|
||||
struct SharedState {
|
||||
start_work_barrier: tokio::sync::Barrier,
|
||||
live_stats: LiveStats,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct LiveStats {
|
||||
completed_requests: AtomicU64,
|
||||
@@ -240,24 +247,26 @@ async fn main_impl(
|
||||
all_ranges
|
||||
};
|
||||
|
||||
let live_stats = Arc::new(LiveStats::default());
|
||||
|
||||
let num_live_stats_dump = 1;
|
||||
let num_work_sender_tasks = args.num_clients.get() * timelines.len();
|
||||
let num_main_impl = 1;
|
||||
|
||||
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
));
|
||||
let shared_state = Arc::new(SharedState {
|
||||
start_work_barrier: tokio::sync::Barrier::new(
|
||||
num_live_stats_dump + num_work_sender_tasks + num_main_impl,
|
||||
),
|
||||
live_stats: LiveStats::default(),
|
||||
});
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let ss = shared_state.clone();
|
||||
tokio::spawn({
|
||||
let stats = Arc::clone(&live_stats);
|
||||
let start_work_barrier = Arc::clone(&start_work_barrier);
|
||||
async move {
|
||||
start_work_barrier.wait().await;
|
||||
ss.start_work_barrier.wait().await;
|
||||
loop {
|
||||
let start = std::time::Instant::now();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
let stats = &ss.live_stats;
|
||||
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
|
||||
let missed = stats.missed.swap(0, Ordering::Relaxed);
|
||||
let elapsed = start.elapsed();
|
||||
@@ -270,14 +279,12 @@ async fn main_impl(
|
||||
}
|
||||
});
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let rps_period = args
|
||||
.per_client_rate
|
||||
.map(|rps_limit| Duration::from_secs_f64(1.0 / (rps_limit as f64)));
|
||||
let make_worker: &dyn Fn(WorkerId) -> Pin<Box<dyn Send + Future<Output = ()>>> = &|worker_id| {
|
||||
let live_stats = live_stats.clone();
|
||||
let start_work_barrier = start_work_barrier.clone();
|
||||
let ss = shared_state.clone();
|
||||
let cancel = cancel.clone();
|
||||
let ranges: Vec<KeyRange> = all_ranges
|
||||
.iter()
|
||||
.filter(|r| r.timeline == worker_id.timeline)
|
||||
@@ -287,85 +294,8 @@ async fn main_impl(
|
||||
rand::distributions::weighted::WeightedIndex::new(ranges.iter().map(|v| v.len()))
|
||||
.unwrap();
|
||||
|
||||
let cancel = cancel.clone();
|
||||
Box::pin(async move {
|
||||
let client =
|
||||
pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros())
|
||||
.unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
live_stats.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -387,7 +317,7 @@ async fn main_impl(
|
||||
};
|
||||
|
||||
info!("waiting for everything to become ready");
|
||||
start_work_barrier.wait().await;
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
info!("work started");
|
||||
if let Some(runtime) = args.runtime {
|
||||
tokio::time::sleep(runtime.into()).await;
|
||||
@@ -416,3 +346,91 @@ async fn main_impl(
|
||||
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
let mut inflight = VecDeque::new();
|
||||
while !cancel.is_cancelled() {
|
||||
// Detect if a request took longer than the RPS rate
|
||||
if let Some(period) = &rps_period {
|
||||
let periods_passed_until_now =
|
||||
usize::try_from(client_start.elapsed().as_micros() / period.as_micros()).unwrap();
|
||||
|
||||
if periods_passed_until_now > ticks_processed {
|
||||
shared_state
|
||||
.live_stats
|
||||
.missed((periods_passed_until_now - ticks_processed) as u64);
|
||||
}
|
||||
ticks_processed = periods_passed_until_now;
|
||||
}
|
||||
|
||||
while inflight.len() < args.queue_depth.get() {
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(key.is_rel_block_key());
|
||||
let (rel_tag, block_no) = key
|
||||
.to_rel_block()
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
hdr: PagestreamRequest {
|
||||
reqid: 0,
|
||||
request_lsn: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
not_modified_since: r.timeline_lsn,
|
||||
},
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
STATS.with(|stats| {
|
||||
stats
|
||||
.borrow()
|
||||
.lock()
|
||||
.unwrap()
|
||||
.observe(end.duration_since(start))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
if let Some(period) = &rps_period {
|
||||
let next_at = client_start
|
||||
+ Duration::from_micros(
|
||||
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
|
||||
);
|
||||
tokio::time::sleep_until(next_at.into()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -416,8 +416,18 @@ fn start_pageserver(
|
||||
// The storage_broker::connect call needs to happen inside a tokio runtime thread.
|
||||
let broker_client = WALRECEIVER_RUNTIME
|
||||
.block_on(async {
|
||||
let tls_config = storage_broker::ClientTlsConfig::new().ca_certificates(
|
||||
conf.ssl_ca_certs
|
||||
.iter()
|
||||
.map(pem::encode)
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
);
|
||||
// Note: we do not attempt connecting here (but validate endpoints sanity).
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)
|
||||
storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
tls_config,
|
||||
)
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
||||
@@ -17,9 +17,10 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{DiskUsageEvictionTaskConfig, MaxVectoredReadBytes};
|
||||
use pageserver_api::models::ImageCompressionAlgorithm;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pem::Pem;
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::{RemotePath, RemoteStorageConfig};
|
||||
use reqwest::{Certificate, Url};
|
||||
use reqwest::Url;
|
||||
use storage_broker::Uri;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::logging::{LogFormat, SecretString};
|
||||
@@ -67,8 +68,8 @@ pub struct PageServerConf {
|
||||
/// Period to reload certificate and private key from files.
|
||||
/// Default: 60s.
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
/// Trusted root CA certificates to use in https APIs.
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
/// Trusted root CA certificates to use in https APIs in PEM format.
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
|
||||
/// Current availability zone. Used for traffic metrics.
|
||||
pub availability_zone: Option<String>,
|
||||
@@ -497,7 +498,10 @@ impl PageServerConf {
|
||||
ssl_ca_certs: match ssl_ca_file {
|
||||
Some(ssl_ca_file) => {
|
||||
let buf = std::fs::read(ssl_ca_file)?;
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
pem::parse_many(&buf)?
|
||||
.into_iter()
|
||||
.filter(|pem| pem.tag() == "CERTIFICATE")
|
||||
.collect()
|
||||
}
|
||||
None => Vec::new(),
|
||||
},
|
||||
|
||||
@@ -8,6 +8,7 @@ use pageserver_api::upcall_api::{
|
||||
ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
|
||||
ValidateRequestTenant, ValidateResponse,
|
||||
};
|
||||
use reqwest::Certificate;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -76,8 +77,8 @@ impl StorageControllerUpcallClient {
|
||||
client = client.default_headers(headers);
|
||||
}
|
||||
|
||||
for ssl_ca_cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(ssl_ca_cert.clone());
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
|
||||
Ok(Some(Self {
|
||||
|
||||
@@ -1285,10 +1285,6 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
if query.is_empty() {
|
||||
return Ok(BTreeMap::default());
|
||||
}
|
||||
|
||||
let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() {
|
||||
Some(ReadPath::new(
|
||||
query.total_keyspace(),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! FIXME: most of this is copy-paste from mgmt_api.rs ; dedupe into a `reqwest_utils::Client` crate.
|
||||
use pageserver_client::mgmt_api::{Error, ResponseErrorMessageExt};
|
||||
use reqwest::Method;
|
||||
use reqwest::{Certificate, Method};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
@@ -34,7 +34,7 @@ impl Client {
|
||||
};
|
||||
let mut http_client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
http_client = http_client.add_root_certificate(cert.clone());
|
||||
http_client = http_client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
let http_client = http_client.build()?;
|
||||
|
||||
|
||||
@@ -55,6 +55,7 @@ tokio-util = { workspace = true }
|
||||
tracing.workspace = true
|
||||
url.workspace = true
|
||||
metrics.workspace = true
|
||||
pem.workspace = true
|
||||
postgres_backend.workspace = true
|
||||
postgres_ffi.workspace = true
|
||||
pq_proto.workspace = true
|
||||
|
||||
@@ -16,7 +16,6 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use metrics::set_build_info_metric;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_CONTROL_FILE_SAVE_INTERVAL, DEFAULT_EVICTION_MIN_RESIDENT, DEFAULT_HEARTBEAT_TIMEOUT,
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PARTIAL_BACKUP_CONCURRENCY,
|
||||
@@ -373,7 +372,10 @@ async fn main() -> anyhow::Result<()> {
|
||||
Some(ssl_ca_file) => {
|
||||
tracing::info!("Using ssl root CA file: {ssl_ca_file:?}");
|
||||
let buf = tokio::fs::read(ssl_ca_file).await?;
|
||||
Certificate::from_pem_bundle(&buf)?
|
||||
pem::parse_many(&buf)?
|
||||
.into_iter()
|
||||
.filter(|pem| pem.tag() == "CERTIFICATE")
|
||||
.collect()
|
||||
}
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
@@ -24,6 +24,15 @@ use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
const RETRY_INTERVAL_MSEC: u64 = 1000;
|
||||
const PUSH_INTERVAL_MSEC: u64 = 1000;
|
||||
|
||||
fn make_tls_config(conf: &SafeKeeperConf) -> storage_broker::ClientTlsConfig {
|
||||
storage_broker::ClientTlsConfig::new().ca_certificates(
|
||||
conf.ssl_ca_certs
|
||||
.iter()
|
||||
.map(pem::encode)
|
||||
.map(storage_broker::Certificate::from_pem),
|
||||
)
|
||||
}
|
||||
|
||||
/// Push once in a while data about all active timelines to the broker.
|
||||
async fn push_loop(
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
@@ -37,8 +46,11 @@ async fn push_loop(
|
||||
|
||||
let active_timelines_set = global_timelines.get_global_broker_active_set();
|
||||
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
|
||||
|
||||
let outbound = async_stream::stream! {
|
||||
@@ -81,8 +93,11 @@ async fn pull_loop(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
|
||||
// TODO: subscribe only to local timelines instead of all
|
||||
let request = SubscribeSafekeeperInfoRequest {
|
||||
@@ -134,8 +149,11 @@ async fn discover_loop(
|
||||
global_timelines: Arc<GlobalTimelines>,
|
||||
stats: Arc<BrokerStats>,
|
||||
) -> Result<()> {
|
||||
let mut client =
|
||||
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
|
||||
let mut client = storage_broker::connect(
|
||||
conf.broker_endpoint.clone(),
|
||||
conf.broker_keepalive_interval,
|
||||
make_tls_config(&conf),
|
||||
)?;
|
||||
|
||||
let request = SubscribeByFilterRequest {
|
||||
types: vec![TypeSubscription {
|
||||
|
||||
@@ -14,6 +14,7 @@ use http_utils::json::{json_request, json_response};
|
||||
use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param};
|
||||
use http_utils::{RequestExt, RouterBuilder};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use pem::Pem;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use safekeeper_api::models::{
|
||||
AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
|
||||
@@ -230,14 +231,20 @@ async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let conf = get_conf(&request);
|
||||
let global_timelines = get_global_timelines(&request);
|
||||
|
||||
let resp = pull_timeline::handle_request(
|
||||
data,
|
||||
conf.sk_auth_token.clone(),
|
||||
conf.ssl_ca_certs.clone(),
|
||||
global_timelines,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let ca_certs = conf
|
||||
.ssl_ca_certs
|
||||
.iter()
|
||||
.map(Pem::contents)
|
||||
.map(reqwest::Certificate::from_der)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map_err(|e| {
|
||||
ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
|
||||
})?;
|
||||
|
||||
let resp =
|
||||
pull_timeline::handle_request(data, conf.sk_auth_token.clone(), ca_certs, global_timelines)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,8 +6,8 @@ use std::time::Duration;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use once_cell::sync::Lazy;
|
||||
use pem::Pem;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_broker::Uri;
|
||||
use tokio::runtime::Runtime;
|
||||
use utils::auth::SwappableJwtAuth;
|
||||
@@ -120,7 +120,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_key_file: Utf8PathBuf,
|
||||
pub ssl_cert_file: Utf8PathBuf,
|
||||
pub ssl_cert_reload_period: Duration,
|
||||
pub ssl_ca_certs: Vec<Certificate>,
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::time::SystemTime;
|
||||
use anyhow::{Context, bail};
|
||||
use futures::StreamExt;
|
||||
use postgres_protocol::message::backend::ReplicationMessage;
|
||||
use reqwest::Certificate;
|
||||
use safekeeper_api::Term;
|
||||
use safekeeper_api::membership::INVALID_GENERATION;
|
||||
use safekeeper_api::models::{PeerInfo, TimelineStatus};
|
||||
@@ -241,7 +242,7 @@ async fn recover(
|
||||
|
||||
let mut client = reqwest::Client::builder();
|
||||
for cert in &conf.ssl_ca_certs {
|
||||
client = client.add_root_certificate(cert.clone());
|
||||
client = client.add_root_certificate(Certificate::from_der(cert.contents())?);
|
||||
}
|
||||
let client = client
|
||||
.build()
|
||||
|
||||
@@ -87,7 +87,12 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
|
||||
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
None => storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
|
||||
let ttid = ProtoTenantTimelineId {
|
||||
@@ -119,7 +124,12 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
|
||||
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
let mut client = match client {
|
||||
Some(c) => c,
|
||||
None => storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap(),
|
||||
None => storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap(),
|
||||
};
|
||||
let mut counter: u64 = 0;
|
||||
|
||||
@@ -164,7 +174,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
let h = tokio::spawn(progress_reporter(counters.clone()));
|
||||
|
||||
let c = storage_broker::connect(DEFAULT_ENDPOINT, Duration::from_secs(5)).unwrap();
|
||||
let c = storage_broker::connect(
|
||||
DEFAULT_ENDPOINT,
|
||||
Duration::from_secs(5),
|
||||
storage_broker::ClientTlsConfig::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
for i in 0..args.num_subs {
|
||||
let c = Some(c.clone());
|
||||
|
||||
@@ -4,7 +4,7 @@ use proto::TenantTimelineId as ProtoTenantTimelineId;
|
||||
use proto::broker_service_client::BrokerServiceClient;
|
||||
use tonic::Status;
|
||||
use tonic::codegen::StdError;
|
||||
use tonic::transport::{Channel, ClientTlsConfig, Endpoint};
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
||||
|
||||
// Code generated by protobuf.
|
||||
@@ -20,6 +20,7 @@ pub mod metrics;
|
||||
|
||||
// Re-exports to avoid direct tonic dependency in user crates.
|
||||
pub use hyper::Uri;
|
||||
pub use tonic::transport::{Certificate, ClientTlsConfig};
|
||||
pub use tonic::{Code, Request, Streaming};
|
||||
|
||||
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||
@@ -38,7 +39,11 @@ pub type BrokerClientChannel = BrokerServiceClient<Channel>;
|
||||
//
|
||||
// NB: this function is not async, but still must be run on a tokio runtime thread
|
||||
// because that's a requirement of tonic_endpoint.connect_lazy()'s Channel::new call.
|
||||
pub fn connect<U>(endpoint: U, keepalive_interval: Duration) -> anyhow::Result<BrokerClientChannel>
|
||||
pub fn connect<U>(
|
||||
endpoint: U,
|
||||
keepalive_interval: Duration,
|
||||
tls_config: ClientTlsConfig,
|
||||
) -> anyhow::Result<BrokerClientChannel>
|
||||
where
|
||||
U: std::convert::TryInto<Uri>,
|
||||
U::Error: std::error::Error + Send + Sync + 'static,
|
||||
@@ -54,8 +59,7 @@ where
|
||||
rustls::crypto::ring::default_provider()
|
||||
.install_default()
|
||||
.ok();
|
||||
let tls = ClientTlsConfig::new();
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
|
||||
tonic_endpoint = tonic_endpoint.tls_config(tls_config)?;
|
||||
}
|
||||
tonic_endpoint = tonic_endpoint
|
||||
.http2_keep_alive_interval(keepalive_interval)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.log_helper import log
|
||||
@@ -122,7 +123,11 @@ class Workload:
|
||||
if allow_recreate:
|
||||
endpoint.safe_psql(f"DROP TABLE IF EXISTS {self.table};")
|
||||
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
|
||||
log.info("XXX LSN 1: %s", str(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn(), pg_current_wal_lsn(), pg_current_wal_flush_lsn();")[0]))
|
||||
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
||||
log.info("XXX LSN 2: %s", str(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn(), pg_current_wal_lsn(), pg_current_wal_flush_lsn();")[0]))
|
||||
time.sleep(1)
|
||||
log.info("XXX LSN 3: %s", str(endpoint.safe_psql("SELECT pg_current_wal_insert_lsn(), pg_current_wal_lsn(), pg_current_wal_flush_lsn();")[0]))
|
||||
last_flush_lsn_upload(
|
||||
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
|
||||
)
|
||||
|
||||
@@ -2,6 +2,8 @@ from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.workload import Workload
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
@@ -72,3 +74,12 @@ def test_pageserver_catchup_while_compute_down(neon_env_builder: NeonEnvBuilder)
|
||||
|
||||
cur.execute("SELECT count(*) FROM foo")
|
||||
assert cur.fetchone() == (20000,)
|
||||
|
||||
|
||||
def test_workload_stuck(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
workload = Workload(env, env.initial_tenant, env.initial_timeline)
|
||||
workload.init()
|
||||
workload.write_rows(10)
|
||||
workload.validate()
|
||||
|
||||
Reference in New Issue
Block a user