WIP Safekeeper broker discovery

This commit is contained in:
Arthur Petukhovsky
2023-10-27 12:24:42 +00:00
parent bd59349af3
commit 73a2c9ec97
11 changed files with 1348 additions and 20 deletions

1209
log.txt Normal file

File diff suppressed because one or more lines are too long

View File

@@ -914,6 +914,7 @@ mod tests {
safekeeper_connstr: safekeeper_connstr.to_owned(),
http_connstr: safekeeper_connstr.to_owned(),
availability_zone: None,
is_discovery: false,
},
latest_update,
}

View File

@@ -152,6 +152,10 @@ struct Args {
/// useful for debugging.
#[arg(long)]
current_thread_runtime: bool,
/// Disable task to push messages to broker every second. Supposed to
/// be used in tests.
#[arg(long)]
disable_periodic_broker_push: bool,
}
// Like PathBufValueParser, but allows empty string.
@@ -279,6 +283,7 @@ async fn main() -> anyhow::Result<()> {
pg_tenant_only_auth,
http_auth,
current_thread_runtime: args.current_thread_runtime,
disable_periodic_broker_push: args.disable_periodic_broker_push,
};
// initialize sentry if SENTRY_DSN is provided

View File

@@ -31,6 +31,12 @@ const PUSH_INTERVAL_MSEC: u64 = 1000;
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
if conf.disable_periodic_broker_push {
info!("broker push_loop is disabled, doing nothing...");
futures::future::pending::<()>().await; // sleep forever
return Ok(());
}
let mut client =
storage_broker::connect(conf.broker_endpoint.clone(), conf.broker_keepalive_interval)?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);

View File

@@ -306,6 +306,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
availability_zone: None,
is_discovery: false,
};
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;

View File

@@ -71,6 +71,7 @@ pub struct SafeKeeperConf {
pub pg_tenant_only_auth: Option<Arc<JwtAuth>>,
pub http_auth: Option<Arc<JwtAuth>>,
pub current_thread_runtime: bool,
pub disable_periodic_broker_push: bool,
}
impl SafeKeeperConf {
@@ -110,6 +111,7 @@ impl SafeKeeperConf {
heartbeat_timeout: Duration::new(5, 0),
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
current_thread_runtime: false,
disable_periodic_broker_push: false,
}
}
}

View File

@@ -267,6 +267,7 @@ impl SharedState {
backup_lsn: self.sk.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
availability_zone: conf.availability_zone.clone(),
is_discovery: false,
}
}

View File

@@ -136,6 +136,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
is_discovery: false,
};
counter += 1;
yield info;

View File

@@ -8,6 +8,14 @@ service BrokerService {
// Subscribe to safekeeper updates.
rpc SubscribeSafekeeperInfo(SubscribeSafekeeperInfoRequest) returns (stream SafekeeperTimelineInfo) {};
// Subscribe to discovery requests.
rpc SubscribeDiscovery(SubscribeDiscoveryRequest) returns (stream DiscoveryRequest) {};
// TODO: make a single method for all subscriptions
// Send a discovery request.
rpc SendDiscovery(DiscoveryRequest) returns (google.protobuf.Empty) {};
// Publish safekeeper updates.
rpc PublishSafekeeperInfo(stream SafekeeperTimelineInfo) returns (google.protobuf.Empty) {};
}
@@ -42,9 +50,18 @@ message SafekeeperTimelineInfo {
string http_connstr = 13;
// Availability zone of a safekeeper.
optional string availability_zone = 11;
bool is_discovery = 14;
}
message TenantTimelineId {
bytes tenant_id = 1;
bytes timeline_id = 2;
}
message SubscribeDiscoveryRequest {
// TODO: this is a temp struct
}
message DiscoveryRequest {
TenantTimelineId tenant_timeline_id = 1;
}

View File

@@ -38,7 +38,10 @@ use metrics::{Encoder, TextEncoder};
use storage_broker::metrics::{NUM_PUBS, NUM_SUBS_ALL, NUM_SUBS_TIMELINE};
use storage_broker::proto::broker_service_server::{BrokerService, BrokerServiceServer};
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use storage_broker::proto::{
DiscoveryRequest, SafekeeperTimelineInfo, SubscribeDiscoveryRequest,
SubscribeSafekeeperInfoRequest,
};
use storage_broker::{
parse_proto_ttid, EitherBody, DEFAULT_KEEPALIVE_INTERVAL, DEFAULT_LISTEN_ADDR,
};
@@ -262,6 +265,27 @@ impl Registry {
subscriber.id, subscriber.key, subscriber.remote_addr
);
}
pub fn send_msg(&self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let shared_state = self.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers
let ttid =
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
Ok(())
}
}
// Private subscriber state.
@@ -293,24 +317,7 @@ struct Publisher {
impl Publisher {
// Send msg to relevant subscribers.
pub fn send_msg(&mut self, msg: &SafekeeperTimelineInfo) -> Result<(), Status> {
// send message to subscribers for everything
let shared_state = self.registry.shared_state.read();
// Err means there is no subscribers, it is fine.
shared_state.chan_to_all_subs.send(msg.clone()).ok();
// send message to per timeline subscribers
let ttid =
parse_proto_ttid(msg.tenant_timeline_id.as_ref().ok_or_else(|| {
Status::new(Code::InvalidArgument, "missing tenant_timeline_id")
})?)?;
if let Some(subs) = shared_state.chans_to_timeline_subs.get(&ttid) {
// Err can't happen here, as tx is destroyed only after removing
// from the map the last subscriber along with tx.
subs.chan
.send(msg.clone())
.expect("rx is still in the map with zero subscribers");
}
Ok(())
self.registry.send_msg(msg)
}
}
@@ -392,6 +399,44 @@ impl BrokerService for Broker {
Box::pin(output) as Self::SubscribeSafekeeperInfoStream
))
}
type SubscribeDiscoveryStream =
Pin<Box<dyn Stream<Item = Result<DiscoveryRequest, Status>> + Send + 'static>>;
async fn subscribe_discovery(
&self,
request: Request<SubscribeDiscoveryRequest>,
) -> Result<Response<Self::SubscribeDiscoveryStream>, Status> {
todo!()
}
async fn send_discovery(
&self,
request: Request<DiscoveryRequest>,
) -> Result<Response<()>, Status> {
let ttid = request.into_inner().tenant_timeline_id;
info!("discovery request {:?}", ttid);
let sk_info = SafekeeperTimelineInfo {
safekeeper_id: 0,
tenant_timeline_id: ttid,
term: 0,
last_log_term: 0,
flush_lsn: 0,
commit_lsn: 0,
backup_lsn: 0,
remote_consistent_lsn: 0,
peer_horizon_lsn: 0,
local_start_lsn: 0,
safekeeper_connstr: String::new(),
http_connstr: String::new(),
availability_zone: None,
is_discovery: true,
};
self.registry.send_msg(&sk_info)?;
Ok(Response::new(()))
}
}
// We serve only metrics and healthcheck through http1.
@@ -532,6 +577,7 @@ mod tests {
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
local_start_lsn: 0,
availability_zone: None,
is_discovery: false,
}
}

View File

@@ -1635,7 +1635,7 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_sk_auth_restart_endpoint")
timeline_id = env.neon_cli.create_branch("test_idle_reconnections")
def collect_stats() -> Dict[str, float]:
# we need to collect safekeeper_pg_queries_received_total metric from all safekeepers
@@ -1686,3 +1686,42 @@ def test_idle_reconnections(neon_env_builder: NeonEnvBuilder):
assert final_stats.get("START_REPLICATION", 0) >= 1
# walproposer should connect to each safekeeper at least once
assert final_stats.get("START_WAL_PUSH", 0) >= 3
def test_broker_pings(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_broker_pings")
endpoint = env.endpoints.create_start(
"test_broker_pings",
config_lines=["shared_buffers=1MB"],
)
endpoint.safe_psql("create table t(i int, payload text)")
# Install extension containing function needed to clear buffer
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
def do_something():
time.sleep(1)
# generate some data to commit WAL on safekeepers
endpoint.safe_psql("insert into t select generate_series(1,100), 'action'")
# clear the buffers
endpoint.safe_psql("select clear_buffer_cache()")
# read data to fetch pages from pageserver
endpoint.safe_psql("select sum(i) from t")
do_something()
do_something()
for sk in env.safekeepers:
# Disable periodic broker push, so pageserver won't be able to discover
# safekeepers without pings
sk.stop().start(extra_opts=["--disable-periodic-broker-push"])
do_something()
do_something()
# restart pageserver and check how everything works
env.pageserver.stop().start()
do_something()