Replace etcd with storage_broker.

This is the replacement itself, the binary landed earlier. See
docs/storage_broker.md.

ref
https://github.com/neondatabase/neon/pull/2466
https://github.com/neondatabase/neon/issues/2394
This commit is contained in:
Arseny Sher
2022-09-16 13:44:28 +03:00
committed by Arseny Sher
parent 249d77c720
commit 32662ff1c4
56 changed files with 1064 additions and 2222 deletions

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-storage-ireland
bucket_region: eu-west-1
console_mgmt_base_url: http://neon-stress-console.local
etcd_endpoints: neon-stress-etcd.local:2379
broker_endpoint: http://storage-broker.neon-stress.local:50051
safekeeper_enable_s3_offload: 'false'
pageserver_config_stub:
pg_distrib_dir: /usr/local

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-ap-southeast-1
bucket_region: ap-southeast-1
console_mgmt_base_url: http://console-release.local
etcd_endpoints: etcd-0.ap-southeast-1.aws.neon.tech:2379
broker_endpoint: https://storage-broker.epsilon.ap-southeast-1.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-eu-central-1
bucket_region: eu-central-1
console_mgmt_base_url: http://console-release.local
etcd_endpoints: etcd-0.eu-central-1.aws.neon.tech:2379
broker_endpoint: https://storage-broker.gamma.eu-central-1.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-prod-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://console-release.local
etcd_endpoints: etcd-0.us-east-2.aws.neon.tech:2379
broker_endpoint: https://storage-broker.delta.us-east-2.internal.aws.neon.tech:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -4,7 +4,7 @@ storage:
console_mgmt_base_url: http://console-release.local
bucket_name: zenith-storage-oregon
bucket_region: us-west-2
etcd_endpoints: zenith-1-etcd.local:2379
broker_endpoint: http://storage-broker.prod.local:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-dev-storage-eu-west-1
bucket_region: eu-west-1
console_mgmt_base_url: http://console-staging.local
etcd_endpoints: etcd-0.eu-west-1.aws.neon.build:2379
broker_endpoint: https://storage-broker.zeta.eu-west-1.internal.aws.neon.build:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: zenith-staging-storage-us-east-1
bucket_region: us-east-1
console_mgmt_base_url: http://console-staging.local
etcd_endpoints: etcd-0.us-east-2.aws.neon.build:2379
broker_endpoint: http://storage-broker.staging.local:50051
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -3,7 +3,7 @@ storage:
bucket_name: neon-staging-storage-us-east-2
bucket_region: us-east-2
console_mgmt_base_url: http://console-staging.local
etcd_endpoints: etcd-0.us-east-2.aws.neon.build:2379
broker_endpoint: https://storage-broker.beta.us-east-2.internal.aws.neon.build:443
pageserver_config_stub:
pg_distrib_dir: /usr/local
remote_storage:

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_PAGESERVER }}
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoint='{{ broker_endpoint }}'" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -6,7 +6,7 @@ After=network.target auditd.service
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib SENTRY_DSN={{ SENTRY_URL_SAFEKEEPER }}
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}{{ hostname_suffix }}:6500 --listen-http {{ inventory_hostname }}{{ hostname_suffix }}:7676 -D /storage/safekeeper/data --broker-endpoint={{ broker_endpoint }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ safekeeper_s3_prefix }}"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT

View File

@@ -888,7 +888,8 @@ jobs:
helm upgrade ${{ matrix.proxy_job }} neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
helm upgrade ${{ matrix.proxy_job }}-scram neondatabase/neon-proxy --namespace neon-proxy --install -f .github/helm-values/${{ matrix.proxy_config }}-scram.yaml --set image.tag=${DOCKER_TAG} --set settings.sentryUrl=${{ secrets.SENTRY_URL_PROXY }} --wait --timeout 15m0s
deploy-storage-broker-staging:
deploy-storage-broker:
name: deploy storage broker on old staging and old prod
runs-on: [ self-hosted, dev, x64 ]
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/base:latest
# Compute image isn't strictly required for proxy deploy, but let's still wait for it to run all deploy jobs consistently.

901
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -79,7 +79,7 @@ COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \
&& /usr/local/bin/pageserver -D /data/.neon/ --init \
-c "id=1234" \
-c "broker_endpoints=['http://etcd:2379']" \
-c "broker_endpoint='http://storage_broker:50051'" \
-c "pg_distrib_dir='/usr/local/'" \
-c "listen_pg_addr='0.0.0.0:6400'" \
-c "listen_http_addr='0.0.0.0:9898'"

View File

@@ -26,12 +26,12 @@ See developer documentation in [/docs/SUMMARY.md](/docs/SUMMARY.md) for more inf
* On Ubuntu or Debian, this set of packages should be sufficient to build the code:
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev etcd cmake postgresql-client protobuf-compiler
libssl-dev clang pkg-config libpq-dev cmake postgresql-client protobuf-compiler
```
* On Fedora, these packages are needed:
```bash
dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake etcd postgresql postgresql-contrib protobuf-compiler
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler
```
2. [Install Rust](https://www.rust-lang.org/tools/install)
@@ -44,7 +44,7 @@ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
1. Install XCode and dependencies
```
xcode-select --install
brew install protobuf etcd openssl flex bison
brew install protobuf openssl flex bison
```
2. [Install Rust](https://www.rust-lang.org/tools/install)
@@ -123,12 +123,12 @@ Stopped pageserver 1 process with pid 2545906
# start pageserver and safekeeper
> ./target/debug/neon_local start
Starting etcd broker using "/usr/bin/etcd"
etcd started, pid: 2545996
Starting neon broker at 127.0.0.1:50051
storage_broker started, pid: 2918372
Starting pageserver at '127.0.0.1:64000' in '.neon'.
pageserver started, pid: 2546005
pageserver started, pid: 2918386
Starting safekeeper at '127.0.0.1:5454' in '.neon/safekeepers/sk1'.
safekeeper 1 started, pid: 2546041
safekeeper 1 started, pid: 2918437
# start postgres compute node
> ./target/debug/neon_local pg start main

View File

@@ -25,5 +25,7 @@ url = "2.2.2"
pageserver_api = { path = "../libs/pageserver_api" }
postgres_connection = { path = "../libs/postgres_connection" }
safekeeper_api = { path = "../libs/safekeeper_api" }
# Note: main broker code is inside the binary crate, so linking with the library shouldn't be heavy.
storage_broker = { version = "0.1", path = "../storage_broker" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -10,5 +10,5 @@ id = 1
pg_port = 5454
http_port = 7676
[etcd_broker]
broker_endpoints = ['http://127.0.0.1:2379']
[broker]
listen_addr = '127.0.0.1:50051'

View File

@@ -8,10 +8,10 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane;
use control_plane::local_env::{EtcdBroker, LocalEnv};
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
use control_plane::{etcd, local_env};
use control_plane::{broker, local_env};
use pageserver_api::models::TimelineInfo;
use pageserver_api::{
DEFAULT_HTTP_LISTEN_ADDR as DEFAULT_PAGESERVER_HTTP_ADDR,
@@ -22,9 +22,10 @@ use safekeeper_api::{
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::collections::{BTreeSet, HashMap};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use utils::{
auth::{Claims, Scope},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
@@ -41,13 +42,12 @@ project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "14";
fn default_conf(etcd_binary_path: &Path) -> String {
fn default_conf() -> String {
format!(
r#"
# Default built-in configuration, defined in main.rs
[etcd_broker]
broker_endpoints = ['http://localhost:2379']
etcd_binary_path = '{etcd_binary_path}'
[broker]
listen_addr = '{DEFAULT_BROKER_ADDR}'
[pageserver]
id = {DEFAULT_PAGESERVER_ID}
@@ -60,7 +60,6 @@ id = {DEFAULT_SAFEKEEPER_ID}
pg_port = {DEFAULT_SAFEKEEPER_PG_PORT}
http_port = {DEFAULT_SAFEKEEPER_HTTP_PORT}
"#,
etcd_binary_path = etcd_binary_path.display(),
pageserver_auth_type = AuthType::Trust,
)
}
@@ -298,7 +297,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
})?
} else {
// Built-in default config
default_conf(&EtcdBroker::locate_etcd()?)
default_conf()
};
let pg_version = init_match
@@ -807,14 +806,14 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
etcd::start_etcd_process(env)?;
broker::start_broker_process(env)?;
let pageserver = PageServerNode::from_env(env);
// Postgres nodes are not started automatically
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
eprintln!("pageserver start failed: {e}");
try_stop_etcd_process(env);
try_stop_storage_broker_process(env);
exit(1);
}
@@ -822,7 +821,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
let safekeeper = SafekeeperNode::from_env(env, node);
if let Err(e) = safekeeper.start() {
eprintln!("safekeeper '{}' start failed: {e}", safekeeper.id);
try_stop_etcd_process(env);
try_stop_storage_broker_process(env);
exit(1);
}
}
@@ -854,14 +853,14 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
}
}
try_stop_etcd_process(env);
try_stop_storage_broker_process(env);
Ok(())
}
fn try_stop_etcd_process(env: &local_env::LocalEnv) {
if let Err(e) = etcd::stop_etcd_process(env) {
eprintln!("etcd stop failed: {e}");
fn try_stop_storage_broker_process(env: &local_env::LocalEnv) {
if let Err(e) = broker::stop_broker_process(env) {
eprintln!("neon broker stop failed: {e}");
}
}

View File

@@ -0,0 +1,48 @@
use anyhow::Context;
use std::path::PathBuf;
use crate::{background_process, local_env};
pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let broker = &env.broker;
let listen_addr = &broker.listen_addr;
print!("Starting neon broker at {}", listen_addr);
let args = [format!("--listen-addr={listen_addr}")];
let client = reqwest::blocking::Client::new();
background_process::start_process(
"storage_broker",
&env.base_data_dir,
&env.storage_broker_bin(),
&args,
[],
background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)),
|| {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}",)
})?;
let request = client
.get(status_url)
.build()
.with_context(|| format!("Failed to construct request to broker endpoint {url}"))?;
match client.execute(request) {
Ok(resp) => Ok(resp.status().is_success()),
Err(_) => Ok(false),
}
},
)
.context("Failed to spawn storage_broker subprocess")?;
Ok(())
}
pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
}
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> PathBuf {
env.base_data_dir.join("storage_broker.pid")
}

View File

@@ -1,78 +0,0 @@
use std::{fs, path::PathBuf};
use anyhow::Context;
use crate::{background_process, local_env};
pub fn start_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
let etcd_broker = &env.etcd_broker;
print!(
"Starting etcd broker using {:?}",
etcd_broker.etcd_binary_path
);
let etcd_data_dir = env.base_data_dir.join("etcd");
fs::create_dir_all(&etcd_data_dir)
.with_context(|| format!("Failed to create etcd data dir {etcd_data_dir:?}"))?;
let client_urls = etcd_broker.comma_separated_endpoints();
let args = [
format!("--data-dir={}", etcd_data_dir.display()),
format!("--listen-client-urls={client_urls}"),
format!("--advertise-client-urls={client_urls}"),
// Set --quota-backend-bytes to keep the etcd virtual memory
// size smaller. Our test etcd clusters are very small.
// See https://github.com/etcd-io/etcd/issues/7910
"--quota-backend-bytes=100000000".to_string(),
// etcd doesn't compact (vacuum) with default settings,
// enable it to prevent space exhaustion.
"--auto-compaction-mode=revision".to_string(),
"--auto-compaction-retention=1".to_string(),
];
let pid_file_path = etcd_pid_file_path(env);
let client = reqwest::blocking::Client::new();
background_process::start_process(
"etcd",
&etcd_data_dir,
&etcd_broker.etcd_binary_path,
&args,
[],
background_process::InitialPidFile::Create(&pid_file_path),
|| {
for broker_endpoint in &etcd_broker.broker_endpoints {
let request = broker_endpoint
.join("health")
.with_context(|| {
format!(
"Failed to append /health path to broker endopint {}",
broker_endpoint
)
})
.and_then(|url| {
client.get(&url.to_string()).build().with_context(|| {
format!("Failed to construct request to etcd endpoint {url}")
})
})?;
if client.execute(request).is_ok() {
return Ok(true);
}
}
Ok(false)
},
)
.context("Failed to spawn etcd subprocess")?;
Ok(())
}
pub fn stop_etcd_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
background_process::stop_process(true, "etcd", &etcd_pid_file_path(env))
}
fn etcd_pid_file_path(env: &local_env::LocalEnv) -> PathBuf {
env.base_data_dir.join("etcd.pid")
}

View File

@@ -8,8 +8,8 @@
//
mod background_process;
pub mod broker;
pub mod compute;
pub mod etcd;
pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;

View File

@@ -4,12 +4,16 @@
//! script which will use local paths.
use anyhow::{bail, ensure, Context};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use utils::{
@@ -62,7 +66,7 @@ pub struct LocalEnv {
#[serde(default)]
pub private_key_path: PathBuf,
pub etcd_broker: EtcdBroker,
pub broker: NeonBroker,
pub pageserver: PageServerConf,
@@ -78,67 +82,26 @@ pub struct LocalEnv {
branch_name_mappings: HashMap<String, Vec<(TenantId, TimelineId)>>,
}
/// Etcd broker config for cluster internal communication.
#[serde_as]
/// Broker config for cluster internal communication.
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EtcdBroker {
/// A prefix to all to any key when pushing/polling etcd from a node.
#[serde(default)]
pub broker_etcd_prefix: Option<String>,
/// Broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'.
#[serde(default)]
#[serde_as(as = "Vec<DisplayFromStr>")]
pub broker_endpoints: Vec<Url>,
/// Etcd binary path to use.
#[serde(default)]
pub etcd_binary_path: PathBuf,
#[serde(default)]
pub struct NeonBroker {
/// Broker listen address for storage nodes coordination, e.g. '127.0.0.1:50051'.
pub listen_addr: SocketAddr,
}
impl EtcdBroker {
pub fn locate_etcd() -> anyhow::Result<PathBuf> {
let which_output = Command::new("which")
.arg("etcd")
.output()
.context("Failed to run 'which etcd' command")?;
let stdout = String::from_utf8_lossy(&which_output.stdout);
ensure!(
which_output.status.success(),
"'which etcd' invocation failed. Status: {}, stdout: {stdout}, stderr: {}",
which_output.status,
String::from_utf8_lossy(&which_output.stderr)
);
let etcd_path = PathBuf::from(stdout.trim());
ensure!(
etcd_path.is_file(),
"'which etcd' invocation was successful, but the path it returned is not a file or does not exist: {}",
etcd_path.display()
);
Ok(etcd_path)
// Dummy Default impl to satisfy Deserialize derive.
impl Default for NeonBroker {
fn default() -> Self {
NeonBroker {
listen_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0),
}
}
}
pub fn comma_separated_endpoints(&self) -> String {
self.broker_endpoints
.iter()
.map(|url| {
// URL by default adds a '/' path at the end, which is not what etcd CLI wants.
let url_string = url.as_str();
if url_string.ends_with('/') {
&url_string[0..url_string.len() - 1]
} else {
url_string
}
})
.fold(String::new(), |mut comma_separated_urls, url| {
if !comma_separated_urls.is_empty() {
comma_separated_urls.push(',');
}
comma_separated_urls.push_str(url);
comma_separated_urls
})
impl NeonBroker {
pub fn client_url(&self) -> Url {
Url::parse(&format!("http://{}", self.listen_addr)).expect("failed to construct url")
}
}
@@ -234,6 +197,10 @@ impl LocalEnv {
self.neon_distrib_dir.join("safekeeper")
}
pub fn storage_broker_bin(&self) -> PathBuf {
self.neon_distrib_dir.join("storage_broker")
}
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs").join("tenants")
}
@@ -511,8 +478,8 @@ mod tests {
"failed to parse simple config {simple_conf_toml}, reason: {simple_conf_parse_result:?}"
);
let string_to_replace = "broker_endpoints = ['http://127.0.0.1:2379']";
let spoiled_url_str = "broker_endpoints = ['!@$XOXO%^&']";
let string_to_replace = "listen_addr = '127.0.0.1:50051'";
let spoiled_url_str = "listen_addr = '!@$XOXO%^&'";
let spoiled_url_toml = simple_conf_toml.replace(string_to_replace, spoiled_url_str);
assert!(
spoiled_url_toml.contains(spoiled_url_str),

View File

@@ -96,13 +96,8 @@ impl PageServerNode {
}
}
pub fn initialize(
&self,
create_tenant: Option<TenantId>,
initial_timeline_id: Option<TimelineId>,
config_overrides: &[&str],
pg_version: u32,
) -> anyhow::Result<TimelineId> {
// pageserver conf overrides defined by neon_local configuration.
fn neon_local_overrides(&self) -> Vec<String> {
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = format!(
@@ -117,41 +112,32 @@ impl PageServerNode {
);
let listen_pg_addr_param =
format!("listen_pg_addr='{}'", self.env.pageserver.listen_pg_addr);
let broker_endpoints_param = format!(
"broker_endpoints=[{}]",
self.env
.etcd_broker
.broker_endpoints
.iter()
.map(|url| format!("'{url}'"))
.collect::<Vec<_>>()
.join(",")
);
let broker_etcd_prefix_param = self
.env
.etcd_broker
.broker_etcd_prefix
.as_ref()
.map(|prefix| format!("broker_etcd_prefix='{prefix}'"));
let broker_endpoint_param = format!("broker_endpoint='{}'", self.env.broker.client_url());
let mut init_config_overrides = config_overrides.to_vec();
init_config_overrides.push(&id);
init_config_overrides.push(&pg_distrib_dir_param);
init_config_overrides.push(&authg_type_param);
init_config_overrides.push(&listen_http_addr_param);
init_config_overrides.push(&listen_pg_addr_param);
init_config_overrides.push(&broker_endpoints_param);
if let Some(broker_etcd_prefix_param) = broker_etcd_prefix_param.as_deref() {
init_config_overrides.push(broker_etcd_prefix_param);
}
let mut overrides = vec![
id,
pg_distrib_dir_param,
authg_type_param,
listen_http_addr_param,
listen_pg_addr_param,
broker_endpoint_param,
];
if self.env.pageserver.auth_type != AuthType::Trust {
init_config_overrides.push("auth_validation_public_key_path='auth_public_key.pem'");
overrides.push("auth_validation_public_key_path='auth_public_key.pem'".to_owned());
}
overrides
}
pub fn initialize(
&self,
create_tenant: Option<TenantId>,
initial_timeline_id: Option<TimelineId>,
config_overrides: &[&str],
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let mut pageserver_process = self
.start_node(&init_config_overrides, &self.env.base_data_dir, true)
.start_node(config_overrides, &self.env.base_data_dir, true)
.with_context(|| {
format!(
"Failed to start a process for pageserver {}",
@@ -224,6 +210,9 @@ impl PageServerNode {
datadir: &Path,
update_config: bool,
) -> anyhow::Result<Child> {
let mut overrides = self.neon_local_overrides();
overrides.extend(config_overrides.iter().map(|&c| c.to_owned()));
print!(
"Starting pageserver at '{}' in '{}'",
self.pg_connection_config.raw_address(),
@@ -242,7 +231,7 @@ impl PageServerNode {
args.push("--update-config");
}
for config_override in config_overrides {
for config_override in &overrides {
args.extend(["-c", config_override]);
}

View File

@@ -131,13 +131,8 @@ impl SafekeeperNode {
args.push("--no-sync");
}
let comma_separated_endpoints = self.env.etcd_broker.comma_separated_endpoints();
if !comma_separated_endpoints.is_empty() {
args.extend(["--broker-endpoints", &comma_separated_endpoints]);
}
if let Some(prefix) = self.env.etcd_broker.broker_etcd_prefix.as_deref() {
args.extend(["--broker-etcd-prefix", prefix]);
}
let broker_endpoint = format!("{}", self.env.broker.client_url());
args.extend(["--broker-endpoint", &broker_endpoint]);
let mut backup_threads = String::new();
if let Some(threads) = self.conf.backup_threads {

View File

@@ -1,29 +1,6 @@
version: '3'
services:
etcd:
restart: always
image: quay.io/coreos/etcd:v3.5.4
ports:
- 2379:2379
- 2380:2380
environment:
# This signifficantly speeds up etcd and we anyway don't data persistency there.
ETCD_UNSAFE_NO_FSYNC: "1"
command:
- "etcd"
- "--auto-compaction-mode=revision"
- "--auto-compaction-retention=1"
- "--name=etcd-cluster"
- "--initial-cluster-state=new"
- "--initial-cluster-token=etcd-cluster-1"
- "--initial-cluster=etcd-cluster=http://etcd:2380"
- "--initial-advertise-peer-urls=http://etcd:2380"
- "--advertise-client-urls=http://etcd:2379"
- "--listen-client-urls=http://0.0.0.0:2379"
- "--listen-peer-urls=http://0.0.0.0:2380"
- "--quota-backend-bytes=134217728" # 128 MB
minio:
restart: always
image: quay.io/minio/minio:RELEASE.2022-10-20T00-55-09Z
@@ -56,7 +33,7 @@ services:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
environment:
- BROKER_ENDPOINT='http://etcd:2379'
- BROKER_ENDPOINT='http://storage_broker:50051'
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -68,7 +45,7 @@ services:
- "-c"
command:
- "/usr/local/bin/pageserver -D /data/.neon/
-c \"broker_endpoints=[$$BROKER_ENDPOINT]\"
-c \"broker_endpoint=$$BROKER_ENDPOINT\"
-c \"listen_pg_addr='0.0.0.0:6400'\"
-c \"listen_http_addr='0.0.0.0:9898'\"
-c \"remote_storage={endpoint='http://minio:9000',
@@ -76,7 +53,7 @@ services:
bucket_region='eu-north-1',
prefix_in_bucket='/pageserver/'}\""
depends_on:
- etcd
- storage_broker
- minio_create_buckets
safekeeper1:
@@ -85,7 +62,7 @@ services:
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper1:5454
- SAFEKEEPER_ID=1
- BROKER_ENDPOINT=http://etcd:2379
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -99,14 +76,14 @@ services:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoints=$$BROKER_ENDPOINT
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- etcd
- storage_broker
- minio_create_buckets
safekeeper2:
@@ -115,7 +92,7 @@ services:
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper2:5454
- SAFEKEEPER_ID=2
- BROKER_ENDPOINT=http://etcd:2379
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -129,14 +106,14 @@ services:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoints=$$BROKER_ENDPOINT
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- etcd
- storage_broker
- minio_create_buckets
safekeeper3:
@@ -145,7 +122,7 @@ services:
environment:
- SAFEKEEPER_ADVERTISE_URL=safekeeper3:5454
- SAFEKEEPER_ID=3
- BROKER_ENDPOINT=http://etcd:2379
- BROKER_ENDPOINT=http://storage_broker:50051
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=password
#- RUST_BACKTRACE=1
@@ -159,16 +136,25 @@ services:
- "safekeeper --listen-pg=$$SAFEKEEPER_ADVERTISE_URL
--listen-http='0.0.0.0:7676'
--id=$$SAFEKEEPER_ID
--broker-endpoints=$$BROKER_ENDPOINT
--broker-endpoint=$$BROKER_ENDPOINT
-D /data
--remote-storage=\"{endpoint='http://minio:9000',
bucket_name='neon',
bucket_region='eu-north-1',
prefix_in_bucket='/safekeeper/'}\""
depends_on:
- etcd
- storage_broker
- minio_create_buckets
storage_broker:
restart: always
image: ${REPOSITORY:-neondatabase}/neon:${TAG:-latest}
ports:
- 50051:50051
command:
- "storage_broker"
- "--listen-addr=0.0.0.0:50051"
compute:
restart: always
build:

View File

@@ -2,7 +2,7 @@
### Overview
We use JWT tokens in communication between almost all components (compute, pageserver, safekeeper, CLI) regardless of the protocol used (HTTP/PostgreSQL).
Etcd currently has no authentication.
storage_broker currently has no authentication.
Authentication is optional and is disabled by default for easier debugging.
It is used in some tests, though.
Note that we do not cover authentication with `pg.neon.tech` here.
@@ -84,7 +84,7 @@ the scope is the tenant and the token is usually passed through the
Pageserver keeps track of multiple tenants, each having multiple timelines.
For each timeline, it connects to the corresponding Safekeeper.
Information about "corresponding Safekeeper" is published by Safekeepers
in the Etcd, but they do not publish access tokens, otherwise what is
in the storage_broker, but they do not publish access tokens, otherwise what is
the point of authentication.
Pageserver keeps a connection to some set of Safekeepers, which

View File

@@ -23,9 +23,9 @@ We build all images after a successful `release` tests run and push automaticall
You can see a [docker compose](https://docs.docker.com/compose/) example to create a neon cluster in [/docker-compose/docker-compose.yml](/docker-compose/docker-compose.yml). It creates the following conatainers.
- etcd x 1
- pageserver x 1
- safekeeper x 3
- storage_broker x 1
- compute x 1
- MinIO x 1 # This is Amazon S3 compatible object storage
@@ -41,7 +41,7 @@ $ cd docker-compose/docker-compose.yml
$ docker-compose down # remove the conainers if exists
$ PG_VERSION=15 TAG=2221 docker-compose up --build -d # You can specify the postgres and image version
Creating network "dockercompose_default" with the default driver
Creating dockercompose_etcd3_1 ...
Creating docker-compose_storage_broker_1 ... done
(...omit...)
```

View File

@@ -10,7 +10,6 @@ the values in the config file, if any are specified for the same key and get int
```toml
# Initial configuration file created by 'pageserver --init'
listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
@@ -25,13 +24,12 @@ max_file_descriptors = '100'
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'cloud_admin'
broker_etcd_prefix = 'neon'
broker_endpoints = ['some://etcd']
broker_endpoint = 'http://127.0.0.1:50051'
# [remote_storage]
```
The config above shows default values for all basic pageserver settings, besides `broker_endpoints`: that one has to be set by the user,
The config above shows default values for all basic pageserver settings, besides `broker_endpoint`: that one has to be set by the user,
see the corresponding section below.
Pageserver uses default values for all files that are missing in the config, so it's not a hard error to leave the config blank.
Yet, it validates the config values it can (e.g. postgres install dir) and errors if the validation fails, refusing to start.
@@ -50,16 +48,10 @@ Example: `${PAGESERVER_BIN} -c "checkpoint_timeout = '10 m'" -c "remote_storage=
Note that TOML distinguishes between strings and integers, the former require single or double quotes around them.
#### broker_endpoints
#### broker_endpoint
A list of endpoints (etcd currently) to connect and pull the information from.
Mandatory, does not have a default, since requires etcd to be started as a separate process,
and its connection url should be specified separately.
#### broker_etcd_prefix
A prefix to add for every etcd key used, to separate one group of related instances from another, in the same cluster.
Default is `neon`.
A storage broker endpoint to connect and pull the information from. Default is
`'http://127.0.0.1:50051'`.
#### checkpoint_distance

View File

@@ -1,18 +0,0 @@
[package]
name = "etcd_broker"
version = "0.1.0"
edition = "2021"
[dependencies]
etcd-client = "0.9.0"
regex = "1.4.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "2.0"
once_cell = "1.13.0"
utils = { path = "../utils" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
tokio = "1"
tracing = "0.1"
thiserror = "1"

View File

@@ -1,209 +0,0 @@
//! A set of primitives to access a shared data/updates, propagated via etcd broker (not persistent).
//! Intended to connect services to each other, not to store their data.
/// All broker keys, that are used when dealing with etcd.
pub mod subscription_key;
/// All broker values, possible to use when dealing with etcd.
pub mod subscription_value;
use std::str::FromStr;
use serde::de::DeserializeOwned;
use subscription_key::SubscriptionKey;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use crate::subscription_key::SubscriptionFullKey;
pub use etcd_client::*;
/// Default value to use for prefixing to all etcd keys with.
/// This way allows isolating safekeeper/pageserver groups in the same etcd cluster.
pub const DEFAULT_NEON_BROKER_ETCD_PREFIX: &str = "neon";
/// A way to control the data retrieval from a certain subscription.
pub struct BrokerSubscription<V> {
/// An unbounded channel to fetch the relevant etcd updates from.
pub value_updates: mpsc::UnboundedReceiver<BrokerUpdate<V>>,
key: SubscriptionKey,
/// A subscription task handle, to allow waiting on it for the task to complete.
/// Both the updates channel and the handle require `&mut`, so it's better to keep
/// both `pub` to allow using both in the same structures without borrow checker complaining.
pub watcher_handle: JoinHandle<Result<(), BrokerError>>,
watcher: Watcher,
}
impl<V> BrokerSubscription<V> {
/// Cancels the subscription, stopping the data poller and waiting for it to shut down.
pub async fn cancel(mut self) -> Result<(), BrokerError> {
self.watcher.cancel().await.map_err(|e| {
BrokerError::EtcdClient(
e,
format!("Failed to cancel broker subscription, kind: {:?}", self.key),
)
})?;
match (&mut self.watcher_handle).await {
Ok(res) => res,
Err(e) => {
if e.is_cancelled() {
// don't error on the tasks that are cancelled already
Ok(())
} else {
Err(BrokerError::InternalError(format!(
"Panicked during broker subscription task, kind: {:?}, error: {e}",
self.key
)))
}
}
}
}
}
impl<V> Drop for BrokerSubscription<V> {
fn drop(&mut self) {
// we poll data from etcd into the channel in the same struct, so if the whole struct gets dropped,
// no more data is used by the receiver and it's safe to cancel and drop the whole etcd subscription task.
self.watcher_handle.abort();
}
}
/// An update from the etcd broker.
pub struct BrokerUpdate<V> {
/// Etcd generation version, the bigger the more actual the data is.
pub etcd_version: i64,
/// Etcd key for the corresponding value, parsed from the broker KV.
pub key: SubscriptionFullKey,
/// Current etcd value, parsed from the broker KV.
pub value: V,
}
#[derive(Debug, thiserror::Error)]
pub enum BrokerError {
#[error("Etcd client error: {0}. Context: {1}")]
EtcdClient(etcd_client::Error, String),
#[error("Error during parsing etcd key: {0}")]
KeyNotParsed(String),
#[error("Internal error: {0}")]
InternalError(String),
}
/// Creates a background task to poll etcd for timeline updates from safekeepers.
/// Stops and returns `Err` on any error during etcd communication.
/// Watches the key changes until either the watcher is cancelled via etcd or the subscription cancellation handle,
/// exiting normally in such cases.
/// Etcd values are parsed as json fukes into a type, specified in the generic patameter.
pub async fn subscribe_for_json_values<V>(
client: &mut Client,
key: SubscriptionKey,
) -> Result<BrokerSubscription<V>, BrokerError>
where
V: DeserializeOwned + Send + 'static,
{
subscribe_for_values(client, key, |_, value_str| {
match serde_json::from_str::<V>(value_str) {
Ok(value) => Some(value),
Err(e) => {
error!("Failed to parse value str '{value_str}': {e}");
None
}
}
})
.await
}
/// Same as [`subscribe_for_json_values`], but allows to specify a custom parser of a etcd value string.
pub async fn subscribe_for_values<P, V>(
client: &mut Client,
key: SubscriptionKey,
value_parser: P,
) -> Result<BrokerSubscription<V>, BrokerError>
where
V: Send + 'static,
P: Fn(SubscriptionFullKey, &str) -> Option<V> + Send + 'static,
{
info!("Subscribing to broker value updates, key: {key:?}");
let subscription_key = key.clone();
let (watcher, mut stream) = client
.watch(key.watch_key(), Some(WatchOptions::new().with_prefix()))
.await
.map_err(|e| {
BrokerError::EtcdClient(
e,
format!("Failed to init the watch for subscription {key:?}"),
)
})?;
let (value_updates_sender, value_updates_receiver) = mpsc::unbounded_channel();
let watcher_handle = tokio::spawn(async move {
while let Some(resp) = stream.message().await.map_err(|e| BrokerError::InternalError(format!(
"Failed to get messages from the subscription stream, kind: {:?}, error: {e}", key.kind
)))? {
if resp.canceled() {
info!("Watch for timeline updates subscription was canceled, exiting");
break;
}
let events = resp.events();
debug!("Processing {} events", events.len());
for event in events {
if EventType::Put == event.event_type() {
if let Some(new_etcd_kv) = event.kv() {
match parse_etcd_kv(new_etcd_kv, &value_parser, &key.cluster_prefix) {
Ok(Some((key, value))) => if let Err(e) = value_updates_sender.send(BrokerUpdate {
etcd_version: new_etcd_kv.version(),
key,
value,
}) {
info!("Broker value updates for key {key:?} sender got dropped, exiting: {e}");
break;
},
Ok(None) => debug!("Ignoring key {key:?} : no value was returned by the parser"),
Err(BrokerError::KeyNotParsed(e)) => debug!("Unexpected key {key:?} for timeline update: {e}"),
Err(e) => error!("Failed to represent etcd KV {new_etcd_kv:?}: {e}"),
};
}
}
}
}
Ok(())
}.instrument(info_span!("etcd_broker")));
Ok(BrokerSubscription {
key: subscription_key,
value_updates: value_updates_receiver,
watcher_handle,
watcher,
})
}
fn parse_etcd_kv<P, V>(
kv: &KeyValue,
value_parser: &P,
cluster_prefix: &str,
) -> Result<Option<(SubscriptionFullKey, V)>, BrokerError>
where
P: Fn(SubscriptionFullKey, &str) -> Option<V>,
{
let key_str = kv.key_str().map_err(|e| {
BrokerError::EtcdClient(e, "Failed to extract key str out of etcd KV".to_string())
})?;
let value_str = kv.value_str().map_err(|e| {
BrokerError::EtcdClient(e, "Failed to extract value str out of etcd KV".to_string())
})?;
if !key_str.starts_with(cluster_prefix) {
return Err(BrokerError::KeyNotParsed(format!(
"KV has unexpected key '{key_str}' that does not start with cluster prefix {cluster_prefix}"
)));
}
let key = SubscriptionFullKey::from_str(&key_str[cluster_prefix.len()..]).map_err(|e| {
BrokerError::KeyNotParsed(format!("Failed to parse KV key '{key_str}': {e}"))
})?;
Ok(value_parser(key, value_str).map(|value| (key, value)))
}

View File

@@ -1,310 +0,0 @@
//! Etcd broker keys, used in the project and shared between instances.
//! The keys are split into two categories:
//!
//! * [`SubscriptionFullKey`] full key format: `<cluster_prefix>/<tenant>/<timeline>/<node_kind>/<operation>/<node_id>`
//! Always returned from etcd in this form, always start with the user key provided.
//!
//! * [`SubscriptionKey`] user input key format: always partial, since it's unknown which `node_id`'s are available.
//! Full key always starts with the user input one, due to etcd subscription properties.
use std::{fmt::Display, str::FromStr};
use once_cell::sync::Lazy;
use regex::{Captures, Regex};
use utils::id::{NodeId, TenantId, TenantTimelineId};
/// The subscription kind to the timeline updates from safekeeper.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SubscriptionKey {
/// Generic cluster prefix, allowing to use the same etcd instance by multiple logic groups.
pub cluster_prefix: String,
/// The subscription kind.
pub kind: SubscriptionKind,
}
/// All currently possible key kinds of a etcd broker subscription.
/// Etcd works so, that every key that starts with the subbscription key given is considered matching and
/// returned as part of the subscrption.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SubscriptionKind {
/// Get every update in etcd.
All,
/// Get etcd updates for any timeiline of a certain tenant, affected by any operation from any node kind.
TenantTimelines(TenantId),
/// Get etcd updates for a certain timeline of a tenant, affected by any operation from any node kind.
Timeline(TenantTimelineId),
/// Get etcd timeline updates, specific to a certain node kind.
Node(TenantTimelineId, NodeKind),
/// Get etcd timeline updates for a certain operation on specific nodes.
Operation(TenantTimelineId, NodeKind, OperationKind),
}
/// All kinds of nodes, able to write into etcd.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum NodeKind {
Safekeeper,
Pageserver,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum OperationKind {
Safekeeper(SkOperationKind),
}
/// Current operations, running inside the safekeeper node.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SkOperationKind {
TimelineInfo,
WalBackup,
}
static SUBSCRIPTION_FULL_KEY_REGEX: Lazy<Regex> = Lazy::new(|| {
Regex::new("/([[:xdigit:]]+)/([[:xdigit:]]+)/([^/]+)/([^/]+)/([[:digit:]]+)$")
.expect("wrong subscription full etcd key regex")
});
/// Full key, received from etcd during any of the component's work.
/// No other etcd keys are considered during system's work.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionFullKey {
pub id: TenantTimelineId,
pub node_kind: NodeKind,
pub operation: OperationKind,
pub node_id: NodeId,
}
impl SubscriptionKey {
/// Subscribes for all etcd updates.
pub fn all(cluster_prefix: String) -> Self {
SubscriptionKey {
cluster_prefix,
kind: SubscriptionKind::All,
}
}
/// Subscribes to a given timeline info updates from safekeepers.
pub fn sk_timeline_info(cluster_prefix: String, timeline: TenantTimelineId) -> Self {
Self {
cluster_prefix,
kind: SubscriptionKind::Operation(
timeline,
NodeKind::Safekeeper,
OperationKind::Safekeeper(SkOperationKind::TimelineInfo),
),
}
}
/// Subscribes to all timeine updates during specific operations, running on the corresponding nodes.
pub fn operation(
cluster_prefix: String,
timeline: TenantTimelineId,
node_kind: NodeKind,
operation: OperationKind,
) -> Self {
Self {
cluster_prefix,
kind: SubscriptionKind::Operation(timeline, node_kind, operation),
}
}
/// Etcd key to use for watching a certain timeline updates from safekeepers.
pub fn watch_key(&self) -> String {
let cluster_prefix = &self.cluster_prefix;
match self.kind {
SubscriptionKind::All => cluster_prefix.to_string(),
SubscriptionKind::TenantTimelines(tenant_id) => {
format!("{cluster_prefix}/{tenant_id}")
}
SubscriptionKind::Timeline(id) => {
format!("{cluster_prefix}/{id}")
}
SubscriptionKind::Node(id, node_kind) => {
format!("{cluster_prefix}/{id}/{node_kind}")
}
SubscriptionKind::Operation(id, node_kind, operation_kind) => {
format!("{cluster_prefix}/{id}/{node_kind}/{operation_kind}")
}
}
}
}
impl Display for OperationKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OperationKind::Safekeeper(o) => o.fmt(f),
}
}
}
impl FromStr for OperationKind {
type Err = String;
fn from_str(operation_kind_str: &str) -> Result<Self, Self::Err> {
match operation_kind_str {
"timeline_info" => Ok(OperationKind::Safekeeper(SkOperationKind::TimelineInfo)),
"wal_backup" => Ok(OperationKind::Safekeeper(SkOperationKind::WalBackup)),
_ => Err(format!("Unknown operation kind: {operation_kind_str}")),
}
}
}
impl Display for SubscriptionFullKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
id,
node_kind,
operation,
node_id,
} = self;
write!(f, "{id}/{node_kind}/{operation}/{node_id}")
}
}
impl FromStr for SubscriptionFullKey {
type Err = String;
fn from_str(subscription_kind_str: &str) -> Result<Self, Self::Err> {
let key_captures = match SUBSCRIPTION_FULL_KEY_REGEX.captures(subscription_kind_str) {
Some(captures) => captures,
None => {
return Err(format!(
"Subscription kind str does not match a subscription full key regex {}",
SUBSCRIPTION_FULL_KEY_REGEX.as_str()
));
}
};
Ok(Self {
id: TenantTimelineId::new(
parse_capture(&key_captures, 1)?,
parse_capture(&key_captures, 2)?,
),
node_kind: parse_capture(&key_captures, 3)?,
operation: parse_capture(&key_captures, 4)?,
node_id: NodeId(parse_capture(&key_captures, 5)?),
})
}
}
fn parse_capture<T>(caps: &Captures, index: usize) -> Result<T, String>
where
T: FromStr,
<T as FromStr>::Err: Display,
{
let capture_match = caps
.get(index)
.ok_or_else(|| format!("Failed to get capture match at index {index}"))?
.as_str();
capture_match.parse().map_err(|e| {
format!(
"Failed to parse {} from {capture_match}: {e}",
std::any::type_name::<T>()
)
})
}
impl Display for NodeKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Safekeeper => write!(f, "safekeeper"),
Self::Pageserver => write!(f, "pageserver"),
}
}
}
impl FromStr for NodeKind {
type Err = String;
fn from_str(node_kind_str: &str) -> Result<Self, Self::Err> {
match node_kind_str {
"safekeeper" => Ok(Self::Safekeeper),
"pageserver" => Ok(Self::Pageserver),
_ => Err(format!("Invalid node kind: {node_kind_str}")),
}
}
}
impl Display for SkOperationKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TimelineInfo => write!(f, "timeline_info"),
Self::WalBackup => write!(f, "wal_backup"),
}
}
}
impl FromStr for SkOperationKind {
type Err = String;
fn from_str(operation_str: &str) -> Result<Self, Self::Err> {
match operation_str {
"timeline_info" => Ok(Self::TimelineInfo),
"wal_backup" => Ok(Self::WalBackup),
_ => Err(format!("Invalid operation: {operation_str}")),
}
}
}
#[cfg(test)]
mod tests {
use utils::id::TimelineId;
use super::*;
#[test]
fn full_cluster_key_parsing() {
let prefix = "neon";
let node_kind = NodeKind::Safekeeper;
let operation_kind = OperationKind::Safekeeper(SkOperationKind::WalBackup);
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let id = TenantTimelineId::new(tenant_id, timeline_id);
let node_id = NodeId(1);
let timeline_subscription_keys = [
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::All,
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::TenantTimelines(tenant_id),
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::Timeline(id),
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::Node(id, node_kind),
},
SubscriptionKey {
cluster_prefix: prefix.to_string(),
kind: SubscriptionKind::Operation(id, node_kind, operation_kind),
},
];
let full_key_string = format!(
"{}/{node_id}",
timeline_subscription_keys.last().unwrap().watch_key()
);
for key in timeline_subscription_keys {
assert!(full_key_string.starts_with(&key.watch_key()), "Full key '{full_key_string}' should start with any of the keys, keys, but {key:?} did not match");
}
let full_key = SubscriptionFullKey::from_str(&full_key_string).unwrap_or_else(|e| {
panic!("Failed to parse {full_key_string} as a subscription full key: {e}")
});
assert_eq!(
full_key,
SubscriptionFullKey {
id,
node_kind,
operation: operation_kind,
node_id
}
)
}
}

View File

@@ -1,38 +0,0 @@
//! Module for the values to put into etcd.
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::lsn::Lsn;
/// Data about safekeeper's timeline. Fields made optional for easy migrations.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub flush_lsn: Option<Lsn>,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub commit_lsn: Option<Lsn>,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub backup_lsn: Option<Lsn>,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub remote_consistent_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub peer_horizon_lsn: Option<Lsn>,
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(default)]
pub local_start_lsn: Option<Lsn>,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,
}

View File

@@ -22,3 +22,40 @@ pub struct TimelineCreateRequest {
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
}
fn lsn_invalid() -> Lsn {
Lsn::INVALID
}
/// Data about safekeeper's timeline, mirrors broker.proto.
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkTimelineInfo {
/// Term of the last entry.
pub last_log_term: Option<u64>,
/// LSN of the last record.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub flush_lsn: Lsn,
/// Up to which LSN safekeeper regards its WAL as committed.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub commit_lsn: Lsn,
/// LSN up to which safekeeper has backed WAL.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub backup_lsn: Lsn,
/// LSN of last checkpoint uploaded by pageserver.
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub remote_consistent_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub peer_horizon_lsn: Lsn,
#[serde_as(as = "DisplayFromStr")]
#[serde(default = "lsn_invalid")]
pub local_start_lsn: Lsn,
/// A connection string to use for WAL receiving.
#[serde(default)]
pub safekeeper_connstr: Option<String>,
}

View File

@@ -59,13 +59,13 @@ tracing = "0.1.36"
url = "2"
walkdir = "2.3.2"
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
pageserver_api = { path = "../libs/pageserver_api" }
postgres_connection = { path = "../libs/postgres_connection" }
postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" }
storage_broker = { version = "0.1", path = "../storage_broker" }
tenant_size_model = { path = "../libs/tenant_size_model" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -247,7 +247,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
// start profiler (if enabled)
let profiler_guard = profiling::init_profiler(conf);
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_etcd_client(conf))?;
WALRECEIVER_RUNTIME.block_on(pageserver::walreceiver::init_broker_client(conf))?;
// initialize authentication for incoming connections
let auth = match &conf.auth_type {

View File

@@ -7,6 +7,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::{RemotePath, RemoteStorageConfig};
use std::env;
use storage_broker::Uri;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
@@ -18,7 +19,7 @@ use std::sync::Arc;
use std::time::Duration;
use toml_edit;
use toml_edit::{Document, Item};
use url::Url;
use utils::{
id::{NodeId, TenantId, TimelineId},
logging::LogFormat,
@@ -39,6 +40,7 @@ pub mod defaults {
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
DEFAULT_PG_LISTEN_PORT,
};
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
@@ -59,7 +61,6 @@ pub mod defaults {
pub const DEFAULT_CONFIG_FILE: &str = formatcp!(
r###"
# Initial configuration file created by 'pageserver --init'
#listen_pg_addr = '{DEFAULT_PG_LISTEN_ADDR}'
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
@@ -71,6 +72,8 @@ pub mod defaults {
# initial superuser role name to use when creating a new tenant
#initial_superuser_name = '{DEFAULT_SUPERUSER}'
#broker_endpoint = '{BROKER_DEFAULT_ENDPOINT}'
#log_format = '{DEFAULT_LOG_FORMAT}'
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
@@ -132,12 +135,8 @@ pub struct PageServerConf {
pub profiling: ProfilingConfig,
pub default_tenant_conf: TenantConf,
/// A prefix to add in etcd brokers before every key.
/// Can be used for isolating different pageserver groups within the same etcd cluster.
pub broker_etcd_prefix: String,
/// Etcd broker endpoints to connect to.
pub broker_endpoints: Vec<Url>,
/// Storage broker endpoints to connect to.
pub broker_endpoint: Uri,
pub log_format: LogFormat,
@@ -148,8 +147,7 @@ pub struct PageServerConf {
/// We do not want to store this in a PageServerConf because the latter may be logged
/// and/or serialized at a whim, while the token is secret. Currently this token is the
/// same for accessing all tenants/timelines, but may become per-tenant/per-timeline in
/// the future, more tokens and auth may arrive for etcd and/or its rewrite (see
/// https://github.com/neondatabase/neon/issues/2394), completely changing the logic.
/// the future, more tokens and auth may arrive for storage broker, completely changing the logic.
/// Hence, we resort to a global variable for now instead of passing the token from the
/// startup code to the connection code through a dozen layers.
pub static SAFEKEEPER_AUTH_TOKEN: OnceCell<Arc<String>> = OnceCell::new();
@@ -216,8 +214,7 @@ struct PageServerConfigBuilder {
id: BuilderValue<NodeId>,
profiling: BuilderValue<ProfilingConfig>,
broker_etcd_prefix: BuilderValue<String>,
broker_endpoints: BuilderValue<Vec<Url>>,
broker_endpoint: BuilderValue<Uri>,
log_format: BuilderValue<LogFormat>,
@@ -247,8 +244,9 @@ impl Default for PageServerConfigBuilder {
remote_storage_config: Set(None),
id: NotSet,
profiling: Set(ProfilingConfig::Disabled),
broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()),
broker_endpoints: Set(Vec::new()),
broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
.parse()
.expect("failed to parse default broker endpoint")),
log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()),
concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()),
@@ -308,12 +306,8 @@ impl PageServerConfigBuilder {
self.remote_storage_config = BuilderValue::Set(remote_storage_config)
}
pub fn broker_endpoints(&mut self, broker_endpoints: Vec<Url>) {
self.broker_endpoints = BuilderValue::Set(broker_endpoints)
}
pub fn broker_etcd_prefix(&mut self, broker_etcd_prefix: String) {
self.broker_etcd_prefix = BuilderValue::Set(broker_etcd_prefix)
pub fn broker_endpoint(&mut self, broker_endpoint: Uri) {
self.broker_endpoint = BuilderValue::Set(broker_endpoint)
}
pub fn id(&mut self, node_id: NodeId) {
@@ -368,12 +362,9 @@ impl PageServerConfigBuilder {
profiling: self.profiling.ok_or(anyhow!("missing profiling"))?,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
broker_endpoints: self
.broker_endpoints
broker_endpoint: self
.broker_endpoint
.ok_or(anyhow!("No broker endpoints provided"))?,
broker_etcd_prefix: self
.broker_etcd_prefix
.ok_or(anyhow!("missing broker_etcd_prefix"))?,
log_format: self.log_format.ok_or(anyhow!("missing log_format"))?,
concurrent_tenant_size_logical_size_queries: self
.concurrent_tenant_size_logical_size_queries
@@ -540,17 +531,7 @@ impl PageServerConf {
}
"id" => builder.id(NodeId(parse_toml_u64(key, item)?)),
"profiling" => builder.profiling(parse_toml_from_str(key, item)?),
"broker_etcd_prefix" => builder.broker_etcd_prefix(parse_toml_string(key, item)?),
"broker_endpoints" => builder.broker_endpoints(
parse_toml_array(key, item)?
.into_iter()
.map(|endpoint_str| {
endpoint_str.parse::<Url>().with_context(|| {
format!("Array item {endpoint_str} for key {key} is not a valid url endpoint")
})
})
.collect::<anyhow::Result<_>>()?,
),
"broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
"log_format" => builder.log_format(
LogFormat::from_config(&parse_toml_string(key, item)?)?
),
@@ -677,8 +658,7 @@ impl PageServerConf {
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::dummy_conf(),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
}
@@ -730,22 +710,6 @@ where
})
}
fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result<Vec<String>> {
let array = item
.as_array()
.with_context(|| format!("configure option {name} is not an array"))?;
array
.iter()
.map(|value| {
value
.as_str()
.map(str::to_string)
.with_context(|| format!("Array item {value:?} for key {name} is not a string"))
})
.collect()
}
/// Configurable semaphore permits setting.
///
/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty
@@ -835,10 +799,10 @@ log_format = 'json'
fn parse_defaults() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let broker_endpoint = "http://127.0.0.1:7777";
let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
// we have to create dummy values to overcome the validation errors
let config_string = format!(
"pg_distrib_dir='{}'\nid=10\nbroker_endpoints = ['{broker_endpoint}']",
"pg_distrib_dir='{}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
@@ -864,10 +828,7 @@ log_format = 'json'
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
broker_endpoints: vec![broker_endpoint
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(),
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
},
@@ -881,10 +842,10 @@ log_format = 'json'
fn parse_basic_config() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let broker_endpoint = "http://127.0.0.1:7777";
let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
let config_string = format!(
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoints = ['{broker_endpoint}']",
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoint = '{broker_endpoint}'",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
@@ -910,10 +871,7 @@ log_format = 'json'
remote_storage_config: None,
profiling: ProfilingConfig::Disabled,
default_tenant_conf: TenantConf::default(),
broker_endpoints: vec![broker_endpoint
.parse()
.expect("Failed to parse a valid broker endpoint URL")],
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT.parse().unwrap(),
log_format: LogFormat::Json,
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
},
@@ -947,7 +905,7 @@ local_path = '{}'"#,
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
broker_endpoints = ['{broker_endpoint}']
broker_endpoint = '{broker_endpoint}'
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
@@ -1014,7 +972,7 @@ concurrency_limit = {s3_concurrency_limit}"#
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
broker_endpoints = ['{broker_endpoint}']
broker_endpoint = '{broker_endpoint}'
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
@@ -1059,7 +1017,7 @@ broker_endpoints = ['{broker_endpoint}']
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{}'
broker_endpoints = ['{broker_endpoint}']
broker_endpoint = '{broker_endpoint}'
[tenant_config]
trace_read_requests = {trace_read_requests}"#,

View File

@@ -71,7 +71,7 @@ use crate::shutdown_pageserver;
//
// WAL receiver runtime:
// - used to handle WAL receiver connections.
// - and to receiver updates from etcd
// - and to receiver updates from storage_broker
//
// Background runtime
// - layer flushing
@@ -178,7 +178,7 @@ pub enum TaskKind {
PageRequestHandler,
// Manages the WAL receiver connection for one timeline. It subscribes to
// events from etcd, decides which safekeeper to connect to. It spawns a
// events from storage_broker, decides which safekeeper to connect to. It spawns a
// separate WalReceiverConnection task to handle each connection.
WalReceiverManager,

View File

@@ -54,7 +54,7 @@ use utils::{
use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walreceiver::{is_etcd_client_initialized, spawn_connection_manager_task};
use crate::walreceiver::{is_broker_client_initialized, spawn_connection_manager_task};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
use crate::METADATA_FILE_NAME;
@@ -856,12 +856,12 @@ impl Timeline {
}
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_etcd_client_initialized() {
if !is_broker_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because etcd client hasn't been initialized");
info!("not launching WAL receiver because broker client hasn't been initialized");
return;
} else {
panic!("etcd client not initialized");
panic!("broker client not initialized");
}
}
@@ -882,7 +882,6 @@ impl Timeline {
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
spawn_connection_manager_task(
self.conf.broker_etcd_prefix.clone(),
self_clone,
walreceiver_connect_timeout,
lagging_wal_timeout,

View File

@@ -6,7 +6,7 @@
//! hence WAL receiver needs to react on such events.
//!
//! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming.
//! For that, it watches specific keys in etcd broker and pulls the relevant data periodically.
//! For that, it watches specific keys in storage_broker and pulls the relevant data periodically.
//! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other.
//! Without this data, no WAL streaming is possible currently.
//!
@@ -26,57 +26,49 @@ mod walreceiver_connection;
use crate::config::PageServerConf;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use anyhow::{ensure, Context};
use etcd_broker::Client;
use itertools::Itertools;
use anyhow::Context;
use once_cell::sync::OnceCell;
use std::future::Future;
use storage_broker::BrokerClientChannel;
use tokio::sync::watch;
use tracing::*;
use url::Url;
pub use connection_manager::spawn_connection_manager_task;
static ETCD_CLIENT: OnceCell<Client> = OnceCell::new();
static BROKER_CLIENT: OnceCell<BrokerClientChannel> = OnceCell::new();
///
/// Initialize the etcd client. This must be called once at page server startup.
/// Initialize the broker client. This must be called once at page server startup.
///
pub async fn init_etcd_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let etcd_endpoints = conf.broker_endpoints.clone();
ensure!(
!etcd_endpoints.is_empty(),
"Cannot start wal receiver: etcd endpoints are empty"
);
pub async fn init_broker_client(conf: &'static PageServerConf) -> anyhow::Result<()> {
let broker_endpoint = conf.broker_endpoint.clone();
let etcd_client = Client::connect(etcd_endpoints.clone(), None)
.await
.context("Failed to connect to etcd")?;
// Note: we do not attempt connecting here (but validate endpoints sanity).
let broker_client = storage_broker::connect(broker_endpoint.clone()).context(format!(
"Failed to create broker client to {}",
&conf.broker_endpoint
))?;
// FIXME: Should we still allow the pageserver to start, if etcd
// doesn't work? It could still serve GetPage requests, with the
// data it has locally and from what it can download from remote
// storage
if ETCD_CLIENT.set(etcd_client).is_err() {
panic!("etcd already initialized");
if BROKER_CLIENT.set(broker_client).is_err() {
panic!("broker already initialized");
}
info!(
"Initialized etcd client with endpoints: {}",
etcd_endpoints.iter().map(Url::to_string).join(", ")
"Initialized broker client with endpoints: {}",
broker_endpoint
);
Ok(())
}
///
/// Get a handle to the etcd client
/// Get a handle to the broker client
///
pub fn get_etcd_client() -> &'static etcd_broker::Client {
ETCD_CLIENT.get().expect("etcd client not initialized")
pub fn get_broker_client() -> &'static BrokerClientChannel {
BROKER_CLIENT.get().expect("broker client not initialized")
}
pub fn is_etcd_client_initialized() -> bool {
ETCD_CLIENT.get().is_some()
pub fn is_broker_client_initialized() -> bool {
BROKER_CLIENT.get().is_some()
}
/// A handle of an asynchronous task.

View File

@@ -1,21 +1,15 @@
//! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
//! that contains the latest WAL to stream and this connection does not go stale.
//!
//! To achieve that, a etcd broker is used: safekepers propagate their timelines' state in it,
//! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
//! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
//! Current connection state is tracked too, to ensure it's not getting stale.
//!
//! After every connection or etcd update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
//! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
//! then a [re]connection happens, if necessary.
//! Only WAL streaming task expects to be finished, other loops (etcd, connection management) never exit unless cancelled explicitly via the dedicated channel.
//! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
use std::{
collections::{hash_map, HashMap},
num::NonZeroU64,
ops::ControlFlow,
sync::Arc,
time::Duration,
};
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
@@ -23,16 +17,18 @@ use crate::tenant::Timeline;
use crate::{task_mgr, walreceiver::TaskStateUpdate};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
subscription_key::SubscriptionKey, subscription_value::SkTimelineInfo, BrokerSubscription,
BrokerUpdate, Client,
};
use pageserver_api::models::TimelineState;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::BrokerClientChannel;
use storage_broker::Streaming;
use tokio::{select, sync::watch};
use tracing::*;
use crate::{
exponential_backoff, walreceiver::get_etcd_client, DEFAULT_BASE_BACKOFF_SECONDS,
exponential_backoff, walreceiver::get_broker_client, DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
};
use postgres_connection::{parse_host_port, PgConnectionConfig};
@@ -45,14 +41,13 @@ use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle};
/// Spawns the loop to take care of the timeline's WAL streaming connection.
pub fn spawn_connection_manager_task(
broker_loop_prefix: String,
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
) {
let mut etcd_client = get_etcd_client().clone();
let mut broker_client = get_broker_client().clone();
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
@@ -65,7 +60,7 @@ pub fn spawn_connection_manager_task(
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver broker started, connecting to etcd");
info!("WAL receiver manager started, connecting to broker");
let mut walreceiver_state = WalreceiverState::new(
timeline,
wal_connect_timeout,
@@ -81,8 +76,7 @@ pub fn spawn_connection_manager_task(
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&broker_loop_prefix,
&mut etcd_client,
&mut broker_client,
&mut walreceiver_state,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
@@ -103,10 +97,9 @@ pub fn spawn_connection_manager_task(
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
/// If etcd subscription is cancelled, exits.
/// If storage broker subscription is cancelled, exits.
async fn connection_manager_loop_step(
broker_prefix: &str,
etcd_client: &mut Client,
broker_client: &mut BrokerClientChannel,
walreceiver_state: &mut WalreceiverState,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
@@ -124,13 +117,11 @@ async fn connection_manager_loop_step(
timeline_id: walreceiver_state.timeline.timeline_id,
};
// XXX: We never explicitly cancel etcd task, instead establishing one and never letting it go,
// running the entire loop step as much as possible to an end.
// The task removal happens implicitly on drop, both aborting the etcd subscription task and dropping the receiver channel end,
// forcing the etcd subscription to exit either way.
let mut broker_subscription =
subscribe_for_timeline_updates(etcd_client, broker_prefix, id).await;
info!("Subscribed for etcd timeline changes, waiting for new etcd data");
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
info!("Subscribed for broker timeline updates");
loop {
let time_until_next_retry = walreceiver_state.time_until_next_retry();
@@ -145,12 +136,6 @@ async fn connection_manager_loop_step(
// - this might change the current desired connection
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
broker_connection_result = &mut broker_subscription.watcher_handle => {
info!("Broker connection was closed from the other side, ending current broker loop step");
cleanup_broker_connection(broker_connection_result, walreceiver_state);
return ControlFlow::Continue(());
},
Some(wal_connection_update) = async {
match walreceiver_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
@@ -185,22 +170,16 @@ async fn connection_manager_loop_step(
}
},
// Got a new update from etcd
broker_update = broker_subscription.value_updates.recv() => {
// Got a new update from the broker
broker_update = broker_subscription.message() => {
match broker_update {
Some(broker_update) => walreceiver_state.register_timeline_update(broker_update),
None => {
info!("Broker sender end was dropped, ending current broker loop step");
// Ensure to cancel and wait for the broker subscription task end, to log its result.
// Broker sender end is in the broker subscription task and its drop means abnormal task completion.
// First, ensure that the task is stopped (abort can be done without errors on already stopped tasks and repeated multiple times).
broker_subscription.watcher_handle.abort();
// Then, wait for the task to finish and print its result. If the task was finished before abort (which we assume in this abnormal case),
// a proper error message will be printed, otherwise an abortion message is printed which is ok, since we're signalled to finish anyway.
cleanup_broker_connection(
(&mut broker_subscription.watcher_handle).await,
walreceiver_state,
);
Ok(Some(broker_update)) => walreceiver_state.register_timeline_update(broker_update),
Err(e) => {
error!("broker subscription failed: {e}");
return ControlFlow::Continue(());
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
return ControlFlow::Continue(());
}
}
@@ -234,17 +213,6 @@ async fn connection_manager_loop_step(
_ = async { tokio::time::sleep(time_until_next_retry.unwrap()).await }, if time_until_next_retry.is_some() => {}
}
// Fetch more etcd timeline updates, but limit ourselves since they may arrive quickly.
let mut max_events_to_poll = 100_u32;
while max_events_to_poll > 0 {
if let Ok(broker_update) = broker_subscription.value_updates.try_recv() {
walreceiver_state.register_timeline_update(broker_update);
max_events_to_poll -= 1;
} else {
break;
}
}
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
info!("Switching to new connection candidate: {new_candidate:?}");
walreceiver_state
@@ -285,33 +253,11 @@ async fn wait_for_active_timeline(
}
}
fn cleanup_broker_connection(
broker_connection_result: Result<Result<(), etcd_broker::BrokerError>, tokio::task::JoinError>,
walreceiver_state: &mut WalreceiverState,
) {
match broker_connection_result {
Ok(Ok(())) => info!("Broker conneciton task finished, ending current broker loop step"),
Ok(Err(broker_error)) => warn!("Broker conneciton ended with error: {broker_error}"),
Err(abort_error) => {
if abort_error.is_panic() {
error!("Broker connection panicked: {abort_error}")
} else {
debug!("Broker connection aborted: {abort_error}")
}
}
}
walreceiver_state.wal_stream_candidates.clear();
}
/// Endlessly try to subscribe for broker updates for a given timeline.
/// If there are no safekeepers to maintain the lease, the timeline subscription will be unavailable in the broker and the operation will fail constantly.
/// This is ok, pageservers should anyway try subscribing (with some backoff) since it's the only way they can get the timeline WAL anyway.
async fn subscribe_for_timeline_updates(
etcd_client: &mut Client,
broker_prefix: &str,
broker_client: &mut BrokerClientChannel,
id: TenantTimelineId,
) -> BrokerSubscription<SkTimelineInfo> {
) -> Streaming<SafekeeperTimelineInfo> {
let mut attempt = 0;
loop {
exponential_backoff(
@@ -322,18 +268,21 @@ async fn subscribe_for_timeline_updates(
.await;
attempt += 1;
match etcd_broker::subscribe_for_json_values(
etcd_client,
SubscriptionKey::sk_timeline_info(broker_prefix.to_owned(), id),
)
.instrument(info_span!("etcd_subscription"))
.await
{
Ok(new_subscription) => {
return new_subscription;
// subscribe to the specific timeline
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
tenant_id: id.tenant_id.as_ref().to_owned(),
timeline_id: id.timeline_id.as_ref().to_owned(),
});
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(key),
};
match broker_client.subscribe_safekeeper_info(request).await {
Ok(resp) => {
return resp.into_inner();
}
Err(e) => {
warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in etcd: {e:#}");
warn!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
continue;
}
}
@@ -360,8 +309,8 @@ struct WalreceiverState {
wal_connection: Option<WalConnection>,
/// Info about retries and unsuccessful attempts to connect to safekeepers.
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from etcd, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, EtcdSkTimeline>,
/// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
auth_token: Option<Arc<String>>,
}
@@ -395,13 +344,11 @@ struct RetryInfo {
retry_duration_seconds: f64,
}
/// Data about the timeline to connect to, received from etcd.
/// Data about the timeline to connect to, received from the broker.
#[derive(Debug)]
struct EtcdSkTimeline {
timeline: SkTimelineInfo,
/// Etcd generation, the bigger it is, the more up to date the timeline data is.
etcd_version: i64,
/// Time at which the data was fetched from etcd last time, to track the stale data.
struct BrokerSkTimeline {
timeline: SafekeeperTimelineInfo,
/// Time at which the data was fetched from the broker last time, to track the stale data.
latest_update: NaiveDateTime,
}
@@ -538,31 +485,18 @@ impl WalreceiverState {
next_retry_at.and_then(|next_retry_at| (next_retry_at - now).to_std().ok())
}
/// Adds another etcd timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: BrokerUpdate<SkTimelineInfo>) {
match self
.wal_stream_candidates
.entry(timeline_update.key.node_id)
{
hash_map::Entry::Occupied(mut o) => {
let existing_value = o.get_mut();
if existing_value.etcd_version < timeline_update.etcd_version {
existing_value.etcd_version = timeline_update.etcd_version;
existing_value.timeline = timeline_update.value;
existing_value.latest_update = Utc::now().naive_utc();
}
}
hash_map::Entry::Vacant(v) => {
v.insert(EtcdSkTimeline {
timeline: timeline_update.value,
etcd_version: timeline_update.etcd_version,
latest_update: Utc::now().naive_utc(),
});
}
}
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
self.wal_stream_candidates.insert(
NodeId(timeline_update.safekeeper_id),
BrokerSkTimeline {
timeline: timeline_update,
latest_update: Utc::now().naive_utc(),
},
);
}
/// Cleans up stale etcd records and checks the rest for the new connection candidate.
/// Cleans up stale broker records and checks the rest for the new connection candidate.
/// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
/// The current rules for approving new candidates:
/// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
@@ -585,7 +519,7 @@ impl WalreceiverState {
Some(existing_wal_connection) => {
let connected_sk_node = existing_wal_connection.sk_id;
let (new_sk_id, new_safekeeper_etcd_data, new_wal_source_connconf) =
let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
self.select_connection_candidate(Some(connected_sk_node))?;
let now = Utc::now().naive_utc();
@@ -614,7 +548,7 @@ impl WalreceiverState {
}
if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
let new_commit_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0));
let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
// Check if the new candidate has much more WAL than the current one.
match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
Some(new_sk_lsn_advantage) => {
@@ -644,7 +578,7 @@ impl WalreceiverState {
.status
.commit_lsn
.unwrap_or(current_lsn);
let candidate_commit_lsn = new_safekeeper_etcd_data.commit_lsn.unwrap_or(Lsn(0));
let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
// Keep discovered_new_wal only if connected safekeeper has not caught up yet.
let mut discovered_new_wal = existing_wal_connection
@@ -727,7 +661,7 @@ impl WalreceiverState {
None
}
/// Selects the best possible candidate, based on the data collected from etcd updates about the safekeepers.
/// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
///
/// The candidate that is chosen:
@@ -736,7 +670,7 @@ impl WalreceiverState {
fn select_connection_candidate(
&self,
node_to_omit: Option<NodeId>,
) -> Option<(NodeId, &SkTimelineInfo, PgConnectionConfig)> {
) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
self.applicable_connection_candidates()
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
.max_by_key(|(_, info, _)| info.commit_lsn)
@@ -746,12 +680,12 @@ impl WalreceiverState {
/// Some safekeepers are filtered by the retry cooldown.
fn applicable_connection_candidates(
&self,
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, PgConnectionConfig)> {
) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
let now = Utc::now().naive_utc();
self.wal_stream_candidates
.iter()
.filter(|(_, info)| info.timeline.commit_lsn.is_some())
.filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
.filter(move |(sk_id, _)| {
let next_retry_at = self
.wal_connection_retries
@@ -761,12 +695,14 @@ impl WalreceiverState {
});
next_retry_at.is_none() || next_retry_at.unwrap() <= now
})
.filter_map(|(sk_id, etcd_info)| {
let info = &etcd_info.timeline;
}).filter_map(|(sk_id, broker_info)| {
let info = &broker_info.timeline;
if info.safekeeper_connstr.is_empty() {
return None; // no connection string, ignore sk
}
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_deref()?,
info.safekeeper_connstr.as_ref(),
match &self.auth_token {
None => None,
Some(x) => Some(x),
@@ -781,15 +717,15 @@ impl WalreceiverState {
})
}
/// Remove candidates which haven't sent etcd updates for a while.
/// Remove candidates which haven't sent broker updates for a while.
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
self.wal_stream_candidates.retain(|node_id, etcd_info| {
if let Ok(time_since_latest_etcd_update) =
(Utc::now().naive_utc() - etcd_info.latest_update).to_std()
self.wal_stream_candidates.retain(|node_id, broker_info| {
if let Ok(time_since_latest_broker_update) =
(Utc::now().naive_utc() - broker_info.latest_update).to_std()
{
let should_retain = time_since_latest_etcd_update < self.lagging_wal_timeout;
let should_retain = time_since_latest_broker_update < self.lagging_wal_timeout;
if !should_retain {
node_ids_to_remove.push(*node_id);
}
@@ -870,6 +806,28 @@ mod tests {
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use url::Host;
fn dummy_broker_sk_timeline(
commit_lsn: u64,
safekeeper_connstr: &str,
latest_update: NaiveDateTime,
) -> BrokerSkTimeline {
BrokerSkTimeline {
timeline: SafekeeperTimelineInfo {
safekeeper_id: 0,
tenant_timeline_id: None,
last_log_term: 0,
flush_lsn: 0,
commit_lsn,
backup_lsn: 0,
remote_consistent_lsn: 0,
peer_horizon_lsn: 0,
local_start_lsn: 0,
safekeeper_connstr: safekeeper_connstr.to_owned(),
},
latest_update,
}
}
#[tokio::test]
async fn no_connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_no_candidate")?;
@@ -881,74 +839,16 @@ mod tests {
state.wal_connection = None;
state.wal_stream_candidates = HashMap::from([
(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(1)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
latest_update: now,
},
),
(
NodeId(1),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: None,
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no_commit_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
},
),
(
NodeId(2),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: None,
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("no_commit_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
},
),
(NodeId(0), dummy_broker_sk_timeline(1, "", now)),
(NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
(NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
(
NodeId(3),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
latest_update: delay_over_threshold,
},
dummy_broker_sk_timeline(
1 + state.max_lsn_wal_lag.get(),
"delay_over_threshold",
delay_over_threshold,
),
),
]);
@@ -995,57 +895,23 @@ mod tests {
state.wal_stream_candidates = HashMap::from([
(
connected_sk_id,
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(current_lsn + state.max_lsn_wal_lag.get() * 2)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(
current_lsn + state.max_lsn_wal_lag.get() * 2,
DUMMY_SAFEKEEPER_HOST,
now,
),
),
(
NodeId(1),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(current_lsn)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not_advanced_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
),
(
NodeId(2),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(current_lsn + state.max_lsn_wal_lag.get() / 2)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("not_enough_advanced_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(
current_lsn + state.max_lsn_wal_lag.get() / 2,
"not_enough_advanced_lsn",
now,
),
),
]);
@@ -1067,21 +933,7 @@ mod tests {
state.wal_connection = None;
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(1 + state.max_lsn_wal_lag.get())),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(1 + state.max_lsn_wal_lag.get(), DUMMY_SAFEKEEPER_HOST, now),
)]);
let only_candidate = state
@@ -1102,57 +954,15 @@ mod tests {
state.wal_stream_candidates = HashMap::from([
(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(selected_lsn - 100)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("smaller_commit_lsn".to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
),
(
NodeId(1),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(selected_lsn)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
),
(
NodeId(2),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(Lsn(selected_lsn + 100)),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: None,
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(selected_lsn + 100, "", now),
),
]);
let biggest_wal_candidate = state.next_connection_candidate().expect(
@@ -1186,39 +996,11 @@ mod tests {
state.wal_stream_candidates = HashMap::from([
(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(bigger_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
),
(
NodeId(1),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(current_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
),
]);
state.wal_connection_retries = HashMap::from([(
@@ -1275,39 +1057,11 @@ mod tests {
state.wal_stream_candidates = HashMap::from([
(
connected_sk_id,
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(current_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
),
(
NodeId(1),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(new_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some("advanced_by_lsn_safekeeper".to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
),
]);
@@ -1367,21 +1121,7 @@ mod tests {
});
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(current_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
)]);
let over_threshcurrent_candidate = state.next_connection_candidate().expect(
@@ -1441,21 +1181,7 @@ mod tests {
});
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
EtcdSkTimeline {
timeline: SkTimelineInfo {
last_log_term: None,
flush_lsn: None,
commit_lsn: Some(new_lsn),
backup_lsn: None,
remote_consistent_lsn: None,
peer_horizon_lsn: None,
local_start_lsn: None,
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_HOST.to_string()),
},
etcd_version: 0,
latest_update: now,
},
dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
)]);
let over_threshcurrent_candidate = state.next_connection_candidate().expect(

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
async-stream = "0.3"
anyhow = "1.0"
async-trait = "0.1"
byteorder = "1.4.3"
@@ -33,12 +34,12 @@ toml_edit = { version = "0.14", features = ["easy"] }
tracing = "0.1.27"
url = "2.2.2"
etcd_broker = { path = "../libs/etcd_broker" }
metrics = { path = "../libs/metrics" }
postgres_ffi = { path = "../libs/postgres_ffi" }
pq_proto = { path = "../libs/pq_proto" }
remote_storage = { path = "../libs/remote_storage" }
safekeeper_api = { path = "../libs/safekeeper_api" }
storage_broker = { version = "0.1", path = "../storage_broker" }
utils = { path = "../libs/utils" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -13,7 +13,6 @@ use std::thread;
use tokio::sync::mpsc;
use toml_edit::Document;
use tracing::*;
use url::{ParseError, Url};
use utils::pid_file;
use metrics::set_build_info_metric;
@@ -29,6 +28,7 @@ use safekeeper::wal_backup;
use safekeeper::wal_service;
use safekeeper::GlobalTimelines;
use safekeeper::SafeKeeperConf;
use storage_broker::DEFAULT_ENDPOINT;
use utils::auth::JwtAuth;
use utils::{
http::endpoint,
@@ -82,12 +82,8 @@ fn main() -> anyhow::Result<()> {
));
}
if let Some(addr) = arg_matches.get_one::<String>("broker-endpoints") {
let collected_ep: Result<Vec<Url>, ParseError> = addr.split(',').map(Url::parse).collect();
conf.broker_endpoints = collected_ep.context("Failed to parse broker endpoint urls")?;
}
if let Some(prefix) = arg_matches.get_one::<String>("broker-etcd-prefix") {
conf.broker_etcd_prefix = prefix.to_string();
if let Some(addr) = arg_matches.get_one::<String>("broker-endpoint") {
conf.broker_endpoint = addr.parse().context("failed to parse broker endpoint")?;
}
if let Some(heartbeat_timeout_str) = arg_matches.get_one::<String>("heartbeat-timeout") {
@@ -224,19 +220,15 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
threads.push(safekeeper_thread);
if !conf.broker_endpoints.is_empty() {
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
// TODO: add auth?
broker::thread_main(conf_);
})?,
);
} else {
warn!("No broker endpoints providing, starting without node sync")
}
let conf_ = conf.clone();
threads.push(
thread::Builder::new()
.name("broker thread".into())
.spawn(|| {
// TODO: add auth?
broker::thread_main(conf_);
})?,
);
let conf_ = conf.clone();
threads.push(
@@ -369,14 +361,9 @@ fn cli() -> Command {
.arg(
Arg::new("id").long("id").help("safekeeper node id: integer")
).arg(
Arg::new("broker-endpoints")
.long("broker-endpoints")
.help("a comma separated broker (etcd) endpoints for storage nodes coordination, e.g. 'http://127.0.0.1:2379'"),
)
.arg(
Arg::new("broker-etcd-prefix")
.long("broker-etcd-prefix")
.help("a prefix to always use when polling/pusing data in etcd from this safekeeper"),
Arg::new("broker-endpoint")
.long("broker-endpoint")
.help(formatcp!("Broker endpoint for storage nodes coordination in the form http[s]://host:port, default '{DEFAULT_ENDPOINT}'. In case of https schema TLS is connection is established; plaintext otherwise.")),
)
.arg(
Arg::new("heartbeat-timeout")

View File

@@ -1,15 +1,18 @@
//! Communication with etcd, providing safekeeper peers and pageserver coordination.
//! Communication with the broker, providing safekeeper peers and pageserver coordination.
use anyhow::anyhow;
use anyhow::bail;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use etcd_broker::subscription_value::SkTimelineInfo;
use etcd_broker::LeaseKeepAliveStream;
use etcd_broker::LeaseKeeper;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::HashSet;
use storage_broker::parse_proto_ttid;
use storage_broker::proto::broker_service_client::BrokerServiceClient;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey as ProtoSubscriptionKey;
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::Request;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::{runtime, time::sleep};
@@ -17,15 +20,9 @@ use tracing::*;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use etcd_broker::{
subscription_key::{OperationKind, SkOperationKind, SubscriptionKey},
Client, PutOptions,
};
use utils::id::{NodeId, TenantTimelineId};
const RETRY_INTERVAL_MSEC: u64 = 1000;
const PUSH_INTERVAL_MSEC: u64 = 1000;
const LEASE_TTL_SEC: i64 = 10;
pub fn thread_main(conf: SafeKeeperConf) {
let runtime = runtime::Builder::new_current_thread()
@@ -34,158 +31,70 @@ pub fn thread_main(conf: SafeKeeperConf) {
.unwrap();
let _enter = info_span!("broker").entered();
info!("started, broker endpoints {:?}", conf.broker_endpoints);
info!("started, broker endpoint {:?}", conf.broker_endpoint);
runtime.block_on(async {
main_loop(conf).await;
});
}
/// Key to per timeline per safekeeper data.
fn timeline_safekeeper_path(
broker_etcd_prefix: String,
ttid: TenantTimelineId,
sk_id: NodeId,
) -> String {
format!(
"{}/{sk_id}",
SubscriptionKey::sk_timeline_info(broker_etcd_prefix, ttid).watch_key()
)
}
async fn push_sk_info(
ttid: TenantTimelineId,
mut client: Client,
key: String,
sk_info: SkTimelineInfo,
mut lease: Lease,
) -> anyhow::Result<(TenantTimelineId, Lease)> {
let put_opts = PutOptions::new().with_lease(lease.id);
client
.put(
key.clone(),
serde_json::to_string(&sk_info)?,
Some(put_opts),
)
.await
.with_context(|| format!("failed to push safekeeper info to {}", key))?;
// revive the lease
lease
.keeper
.keep_alive()
.await
.context("failed to send LeaseKeepAliveRequest")?;
lease
.ka_stream
.message()
.await
.context("failed to receive LeaseKeepAliveResponse")?;
Ok((ttid, lease))
}
struct Lease {
id: i64,
keeper: LeaseKeeper,
ka_stream: LeaseKeepAliveStream,
}
/// Push once in a while data about all active timelines to the broker.
async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
let mut leases: HashMap<TenantTimelineId, Lease> = HashMap::new();
let mut client = BrokerServiceClient::connect(conf.broker_endpoint.clone()).await?;
let push_interval = Duration::from_millis(PUSH_INTERVAL_MSEC);
loop {
// Note: we lock runtime here and in timeline methods as GlobalTimelines
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let mut active_tlis = GlobalTimelines::get_all();
active_tlis.retain(|tli| tli.is_active());
let active_tlis_set: HashSet<TenantTimelineId> =
active_tlis.iter().map(|tli| tli.ttid).collect();
// // Get and maintain (if not yet) per timeline lease to automatically delete obsolete data.
for tli in &active_tlis {
if let Entry::Vacant(v) = leases.entry(tli.ttid) {
let lease = client.lease_grant(LEASE_TTL_SEC, None).await?;
let (keeper, ka_stream) = client.lease_keep_alive(lease.id()).await?;
v.insert(Lease {
id: lease.id(),
keeper,
ka_stream,
});
}
}
leases.retain(|ttid, _| active_tlis_set.contains(ttid));
// Push data concurrently to not suffer from latency, with many timelines it can be slow.
let handles = active_tlis
.iter()
.map(|tli| {
let outbound = async_stream::stream! {
loop {
// Note: we lock runtime here and in timeline methods as GlobalTimelines
// is under plain mutex. That's ok, all this code is not performance
// sensitive and there is no risk of deadlock as we don't await while
// lock is held.
let mut active_tlis = GlobalTimelines::get_all();
active_tlis.retain(|tli| tli.is_active());
for tli in &active_tlis {
let sk_info = tli.get_safekeeper_info(&conf);
let key =
timeline_safekeeper_path(conf.broker_etcd_prefix.clone(), tli.ttid, conf.my_id);
let lease = leases.remove(&tli.ttid).unwrap();
tokio::spawn(push_sk_info(tli.ttid, client.clone(), key, sk_info, lease))
})
.collect::<Vec<_>>();
for h in handles {
let (ttid, lease) = h.await??;
// It is ugly to pull leases from hash and then put it back, but
// otherwise we have to resort to long living per tli tasks (which
// would generate a lot of errors when etcd is down) as task wants to
// have 'static objects, we can't borrow to it.
leases.insert(ttid, lease);
yield sk_info;
}
sleep(push_interval).await;
}
sleep(push_interval).await;
}
};
client
.publish_safekeeper_info(Request::new(outbound))
.await?;
Ok(())
}
/// Subscribe and fetch all the interesting data from the broker.
async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
let mut client = Client::connect(&conf.broker_endpoints, None).await?;
let mut client = storage_broker::connect(conf.broker_endpoint)?;
let mut subscription = etcd_broker::subscribe_for_values(
&mut client,
SubscriptionKey::all(conf.broker_etcd_prefix.clone()),
|full_key, value_str| {
if full_key.operation == OperationKind::Safekeeper(SkOperationKind::TimelineInfo) {
match serde_json::from_str::<SkTimelineInfo>(value_str) {
Ok(new_info) => return Some(new_info),
Err(e) => {
error!("Failed to parse timeline info from value str '{value_str}': {e}")
}
}
}
None
},
)
.await
.context("failed to subscribe for safekeeper info")?;
loop {
match subscription.value_updates.recv().await {
Some(new_info) => {
// note: there are blocking operations below, but it's considered fine for now
if let Ok(tli) = GlobalTimelines::get(new_info.key.id) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
tli.record_safekeeper_info(&new_info.value, new_info.key.node_id)
.await?
}
}
None => {
// XXX it means we lost connection with etcd, error is consumed inside sub object
debug!("timeline updates sender closed, aborting the pull loop");
return Ok(());
}
// TODO: subscribe only to local timelines instead of all
let request = SubscribeSafekeeperInfoRequest {
subscription_key: Some(ProtoSubscriptionKey::All(())),
};
let mut stream = client
.subscribe_safekeeper_info(request)
.await
.context("subscribe_safekeper_info request failed")?
.into_inner();
while let Some(msg) = stream.message().await? {
let proto_ttid = msg
.tenant_timeline_id
.as_ref()
.ok_or_else(|| anyhow!("missing tenant_timeline_id"))?;
let ttid = parse_proto_ttid(proto_ttid)?;
if let Ok(tli) = GlobalTimelines::get(ttid) {
// Note that we also receive *our own* info. That's
// important, as it is used as an indication of live
// connection to the broker.
// note: there are blocking operations below, but it's considered fine for now
tli.record_safekeeper_info(&msg).await?
}
}
bail!("end of stream");
}
async fn main_loop(conf: SafeKeeperConf) {

View File

@@ -3,11 +3,14 @@ use hyper::{Body, Request, Response, StatusCode, Uri};
use anyhow::Context;
use once_cell::sync::Lazy;
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::SkTimelineInfo;
use serde::Serialize;
use serde::Serializer;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use tokio::task::JoinError;
use crate::safekeeper::ServerInfo;
@@ -16,7 +19,6 @@ use crate::safekeeper::Term;
use crate::timelines_global_map::TimelineDeleteForceResult;
use crate::GlobalTimelines;
use crate::SafeKeeperConf;
use etcd_broker::subscription_value::SkTimelineInfo;
use utils::{
auth::JwtAuth,
http::{
@@ -241,7 +243,22 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;
let safekeeper_info: SkTimelineInfo = json_request(&mut request).await?;
let sk_info: SkTimelineInfo = json_request(&mut request).await?;
let proto_sk_info = SafekeeperTimelineInfo {
safekeeper_id: 0,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: ttid.tenant_id.as_ref().to_owned(),
timeline_id: ttid.timeline_id.as_ref().to_owned(),
}),
last_log_term: sk_info.last_log_term.unwrap_or(0),
flush_lsn: sk_info.flush_lsn.0,
commit_lsn: sk_info.commit_lsn.0,
remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
backup_lsn: sk_info.backup_lsn.0,
local_start_lsn: sk_info.local_start_lsn.0,
};
let tli = GlobalTimelines::get(ttid)
// `GlobalTimelines::get` returns an error when it can't find the timeline.
@@ -252,7 +269,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
)
})
.map_err(ApiError::NotFound)?;
tli.record_safekeeper_info(&safekeeper_info, NodeId(1))
tli.record_safekeeper_info(&proto_sk_info)
.await
.map_err(ApiError::InternalServerError)?;

View File

@@ -1,11 +1,11 @@
use defaults::{
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
};
use storage_broker::Uri;
//
use remote_storage::RemoteStorageConfig;
use std::path::PathBuf;
use std::time::Duration;
use url::Url;
use utils::{
id::{NodeId, TenantId, TenantTimelineId},
@@ -62,8 +62,7 @@ pub struct SafeKeeperConf {
pub backup_runtime_threads: usize,
pub wal_backup_enabled: bool,
pub my_id: NodeId,
pub broker_endpoints: Vec<Url>,
pub broker_etcd_prefix: String,
pub broker_endpoint: Uri,
pub auth_validation_public_key_path: Option<PathBuf>,
pub heartbeat_timeout: Duration,
pub max_offloader_lag_bytes: u64,
@@ -93,8 +92,9 @@ impl Default for SafeKeeperConf {
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
remote_storage: None,
my_id: NodeId(0),
broker_endpoints: Vec::new(),
broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(),
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
.parse()
.expect("failed to parse default broker endpoint"),
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
wal_backup_enabled: true,
auth_validation_public_key_path: None,

View File

@@ -4,13 +4,13 @@ use anyhow::{bail, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use etcd_broker::subscription_value::SkTimelineInfo;
use postgres_ffi::{TimeLineID, XLogSegNo, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -896,39 +896,38 @@ where
}
/// Update timeline state with peer safekeeper data.
pub fn record_safekeeper_info(&mut self, sk_info: &SkTimelineInfo) -> Result<()> {
pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
let mut sync_control_file = false;
if let (Some(commit_lsn), Some(last_log_term)) = (sk_info.commit_lsn, sk_info.last_log_term)
{
if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
// Note: the check is too restrictive, generally we can update local
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
if last_log_term == self.get_epoch() {
self.global_commit_lsn = max(commit_lsn, self.global_commit_lsn);
if sk_info.last_log_term == self.get_epoch() {
self.global_commit_lsn = max(Lsn(sk_info.commit_lsn), self.global_commit_lsn);
self.update_commit_lsn()?;
}
}
if let Some(backup_lsn) = sk_info.backup_lsn {
let new_backup_lsn = max(backup_lsn, self.inmem.backup_lsn);
sync_control_file |=
self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
self.inmem.backup_lsn = new_backup_lsn;
}
if let Some(remote_consistent_lsn) = sk_info.remote_consistent_lsn {
let new_remote_consistent_lsn =
max(remote_consistent_lsn, self.inmem.remote_consistent_lsn);
sync_control_file |= self.state.remote_consistent_lsn
+ (self.state.server.wal_seg_size as u64)
< new_remote_consistent_lsn;
self.inmem.remote_consistent_lsn = new_remote_consistent_lsn;
}
if let Some(peer_horizon_lsn) = sk_info.peer_horizon_lsn {
let new_peer_horizon_lsn = max(peer_horizon_lsn, self.inmem.peer_horizon_lsn);
sync_control_file |= self.state.peer_horizon_lsn
+ (self.state.server.wal_seg_size as u64)
< new_peer_horizon_lsn;
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
}
let new_backup_lsn = max(Lsn(sk_info.backup_lsn), self.inmem.backup_lsn);
sync_control_file |=
self.state.backup_lsn + (self.state.server.wal_seg_size as u64) < new_backup_lsn;
self.inmem.backup_lsn = new_backup_lsn;
let new_remote_consistent_lsn = max(
Lsn(sk_info.remote_consistent_lsn),
self.inmem.remote_consistent_lsn,
);
sync_control_file |= self.state.remote_consistent_lsn
+ (self.state.server.wal_seg_size as u64)
< new_remote_consistent_lsn;
self.inmem.remote_consistent_lsn = new_remote_consistent_lsn;
let new_peer_horizon_lsn = max(Lsn(sk_info.peer_horizon_lsn), self.inmem.peer_horizon_lsn);
sync_control_file |= self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64)
< new_peer_horizon_lsn;
self.inmem.peer_horizon_lsn = new_peer_horizon_lsn;
if sync_control_file {
self.persist_control_file(self.state.clone())?;
}

View File

@@ -2,7 +2,6 @@
//! to glue together SafeKeeper and all other background services.
use anyhow::{bail, Result};
use etcd_broker::subscription_value::SkTimelineInfo;
use parking_lot::{Mutex, MutexGuard};
use postgres_ffi::XLogSegNo;
use pq_proto::ReplicationFeedback;
@@ -18,6 +17,9 @@ use utils::{
lsn::Lsn,
};
use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, Term,
@@ -47,13 +49,13 @@ pub struct PeerInfo {
}
impl PeerInfo {
fn from_sk_info(sk_id: NodeId, sk_info: &SkTimelineInfo, ts: Instant) -> PeerInfo {
fn from_sk_info(sk_info: &SafekeeperTimelineInfo, ts: Instant) -> PeerInfo {
PeerInfo {
sk_id,
_last_log_term: sk_info.last_log_term.unwrap_or(0),
_flush_lsn: sk_info.flush_lsn.unwrap_or(Lsn::INVALID),
commit_lsn: sk_info.commit_lsn.unwrap_or(Lsn::INVALID),
local_start_lsn: sk_info.local_start_lsn.unwrap_or(Lsn::INVALID),
sk_id: NodeId(sk_info.safekeeper_id),
_last_log_term: sk_info.last_log_term,
_flush_lsn: Lsn(sk_info.flush_lsn),
commit_lsn: Lsn(sk_info.commit_lsn),
local_start_lsn: Lsn(sk_info.local_start_lsn),
ts,
}
}
@@ -308,21 +310,31 @@ impl SharedState {
pos
}
fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
SkTimelineInfo {
last_log_term: Some(self.sk.get_epoch()),
flush_lsn: Some(self.sk.wal_store.flush_lsn()),
fn get_safekeeper_info(
&self,
ttid: &TenantTimelineId,
conf: &SafeKeeperConf,
) -> SafekeeperTimelineInfo {
SafekeeperTimelineInfo {
safekeeper_id: conf.my_id.0,
tenant_timeline_id: Some(ProtoTenantTimelineId {
tenant_id: ttid.tenant_id.as_ref().to_owned(),
timeline_id: ttid.timeline_id.as_ref().to_owned(),
}),
last_log_term: self.sk.get_epoch(),
flush_lsn: self.sk.wal_store.flush_lsn().0,
// note: this value is not flushed to control file yet and can be lost
commit_lsn: Some(self.sk.inmem.commit_lsn),
commit_lsn: self.sk.inmem.commit_lsn.0,
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: Some(max(
remote_consistent_lsn: max(
self.get_replicas_state().remote_consistent_lsn,
self.sk.inmem.remote_consistent_lsn,
)),
peer_horizon_lsn: Some(self.sk.inmem.peer_horizon_lsn),
safekeeper_connstr: Some(conf.listen_pg_addr.clone()),
backup_lsn: Some(self.sk.inmem.backup_lsn),
local_start_lsn: Some(self.sk.state.local_start_lsn),
)
.0,
peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
safekeeper_connstr: conf.listen_pg_addr.clone(),
backup_lsn: self.sk.inmem.backup_lsn.0,
local_start_lsn: self.sk.state.local_start_lsn.0,
}
}
}
@@ -682,23 +694,19 @@ impl Timeline {
}
/// Get safekeeper info for broadcasting to broker and other peers.
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SkTimelineInfo {
pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo {
let shared_state = self.write_shared_state();
shared_state.get_safekeeper_info(conf)
shared_state.get_safekeeper_info(&self.ttid, conf)
}
/// Update timeline state with peer safekeeper data.
pub async fn record_safekeeper_info(
&self,
sk_info: &SkTimelineInfo,
sk_id: NodeId,
) -> Result<()> {
pub async fn record_safekeeper_info(&self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
let is_wal_backup_action_pending: bool;
let commit_lsn: Lsn;
{
let mut shared_state = self.write_shared_state();
shared_state.sk.record_safekeeper_info(sk_info)?;
let peer_info = PeerInfo::from_sk_info(sk_id, sk_info, Instant::now());
let peer_info = PeerInfo::from_sk_info(sk_info, Instant::now());
shared_state.peers_info.upsert(&peer_info);
is_wal_backup_action_pending = shared_state.update_status(self.ttid);
commit_lsn = shared_state.sk.inmem.commit_lsn;

View File

@@ -7,9 +7,11 @@ edition = "2021"
bench = []
[dependencies]
anyhow = "1.0"
async-stream = "0.3"
bytes = "1.0"
clap = { version = "4.0", features = ["derive"] }
const_format = "0.2.21"
futures = "0.3"
futures-core = "0.3"
futures-util = "0.3"
@@ -19,7 +21,7 @@ hyper = {version = "0.14.14", features = ["full"]}
once_cell = "1.13.0"
parking_lot = "0.12"
prost = "0.11"
tonic = "0.8"
tonic = {version = "0.8", features = ["tls", "tls-roots"]}
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
tracing = "0.1.27"

View File

@@ -6,8 +6,8 @@ use clap::Parser;
use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::proto::{SafekeeperTimelineInfo, SubscribeSafekeeperInfoRequest};
use storage_broker::BrokerClientChannel;
use storage_broker::DEFAULT_LISTEN_ADDR;
use storage_broker::{BrokerClientChannel, DEFAULT_ENDPOINT};
use tokio::time;
use tonic::Request;
@@ -88,9 +88,7 @@ fn tli_from_u64(i: u64) -> Vec<u8> {
async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>, i: u64) {
let mut client = match client {
Some(c) => c,
None => BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
.await
.unwrap(),
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
};
let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
@@ -114,9 +112,7 @@ async fn subscribe(client: Option<BrokerClientChannel>, counter: Arc<AtomicU64>,
async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
let mut client = match client {
Some(c) => c,
None => BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
.await
.unwrap(),
None => storage_broker::connect(DEFAULT_ENDPOINT).unwrap(),
};
let mut counter: u64 = 0;
@@ -156,9 +152,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
let h = tokio::spawn(progress_reporter(counters.clone()));
let c = BrokerClientChannel::connect_lazy(format!("http://{}", DEFAULT_LISTEN_ADDR))
.await
.unwrap();
let c = storage_broker::connect(DEFAULT_ENDPOINT).unwrap();
for i in 0..args.num_subs {
let c = Some(c.clone());

View File

@@ -2,6 +2,7 @@ use hyper::body::HttpBody;
use std::pin::Pin;
use std::task::{Context, Poll};
use tonic::codegen::StdError;
use tonic::transport::{ClientTlsConfig, Endpoint};
use tonic::{transport::Channel, Code, Status};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
@@ -20,12 +21,35 @@ pub mod metrics;
pub use tonic::Request;
pub use tonic::Streaming;
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub use hyper::Uri;
// NeonBrokerClient charged with tonic provided Channel transport; helps to
pub const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
pub const DEFAULT_ENDPOINT: &str = const_format::formatcp!("http://{DEFAULT_LISTEN_ADDR}");
// BrokerServiceClient charged with tonic provided Channel transport; helps to
// avoid depending on tonic directly in user crates.
pub type BrokerClientChannel = BrokerServiceClient<Channel>;
// Create connection object configured to run TLS if schema starts with https://
// and plain text otherwise. Connection is lazy, only endpoint sanity is
// validated here.
pub fn connect<U>(endpoint: U) -> anyhow::Result<BrokerClientChannel>
where
U: std::convert::TryInto<Uri>,
U::Error: std::error::Error + Send + Sync + 'static,
{
let uri: Uri = endpoint.try_into()?;
let mut tonic_endpoint: Endpoint = uri.into();
// If schema starts with https, start encrypted connection; do plain text
// otherwise.
if let Some("https") = tonic_endpoint.uri().scheme_str() {
let tls = ClientTlsConfig::new();
tonic_endpoint = tonic_endpoint.tls_config(tls)?;
}
let channel = tonic_endpoint.connect_lazy();
Ok(BrokerClientChannel::new(channel))
}
impl BrokerClientChannel {
/// Create a new client to the given endpoint, but don't actually connect until the first request.
pub async fn connect_lazy<D>(dst: D) -> Result<Self, tonic::transport::Error>

View File

@@ -13,8 +13,6 @@ Prerequisites:
below to run from other directories.
- The neon git repo, including the postgres submodule
(for some tests, e.g. `pg_regress`)
- Some tests (involving storage nodes coordination) require etcd installed. Follow
[`the guide`](https://etcd.io/docs/v3.5/install/) to obtain it.
### Test Organization

View File

@@ -33,7 +33,7 @@ from _pytest.config import Config
from _pytest.fixtures import FixtureRequest
from fixtures.log_helper import log
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import Fn, allure_attach_from_dir, etcd_path, get_self_dir, subprocess_capture
from fixtures.utils import Fn, allure_attach_from_dir, get_self_dir, subprocess_capture
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
@@ -281,19 +281,22 @@ def port_distributor(worker_base_port: int) -> PortDistributor:
@pytest.fixture(scope="session")
def default_broker(
request: FixtureRequest, port_distributor: PortDistributor, top_output_dir: Path
) -> Iterator[Etcd]:
request: FixtureRequest,
port_distributor: PortDistributor,
top_output_dir: Path,
neon_binpath: Path,
) -> Iterator[NeonBroker]:
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
client_port = port_distributor.get_port()
# multiple pytest sessions could get launched in parallel, get them different datadirs
etcd_datadir = get_test_output_dir(request, top_output_dir) / f"etcd_datadir_{client_port}"
etcd_datadir.mkdir(exist_ok=True, parents=True)
broker = Etcd(
datadir=str(etcd_datadir), port=client_port, peer_port=port_distributor.get_port()
broker_logfile = (
get_test_output_dir(request, top_output_dir) / f"storage_broker_{client_port}.log"
)
broker_logfile.parents[0].mkdir(exist_ok=True, parents=True)
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
yield broker
broker.stop()
allure_attach_from_dir(etcd_datadir)
allure_attach_from_dir(Path(broker_logfile))
@pytest.fixture(scope="session")
@@ -570,7 +573,7 @@ class NeonEnvBuilder:
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: Etcd,
broker: NeonBroker,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
neon_binpath: Path,
@@ -846,9 +849,8 @@ class NeonEnv:
toml += textwrap.dedent(
f"""
[etcd_broker]
broker_endpoints = ['{self.broker.client_url()}']
etcd_binary_path = '{self.broker.binary_path}'
[broker]
listen_addr = '{self.broker.listen_addr()}'
"""
)
@@ -949,7 +951,7 @@ def _shared_simple_env(
request: FixtureRequest,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd,
default_broker: NeonBroker,
run_id: uuid.UUID,
top_output_dir: Path,
neon_binpath: Path,
@@ -1010,7 +1012,7 @@ def neon_env_builder(
neon_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
default_broker: Etcd,
default_broker: NeonBroker,
run_id: uuid.UUID,
) -> Iterator[NeonEnvBuilder]:
"""
@@ -1743,7 +1745,7 @@ class NeonPageserver(PgProtocol):
# All tests print these, when starting up or shutting down
".*wal receiver task finished with an error: walreceiver connection handling failure.*",
".*Shutdown task error: walreceiver connection handling failure.*",
".*Etcd client error: grpc request error: status: Unavailable.*",
".*wal_connection_manager.*tcp connect error: Connection refused.*",
".*query handler for .* failed: Connection reset by peer.*",
".*serving compute connection task.*exited with error: Broken pipe.*",
".*Connection aborted: error communicating with the server: Broken pipe.*",
@@ -1834,7 +1836,6 @@ class NeonPageserver(PgProtocol):
def assert_no_errors(self):
logfile = open(os.path.join(self.env.repo_dir, "pageserver.log"), "r")
error_or_warn = re.compile("ERROR|WARN")
errors = []
while True:
@@ -2653,51 +2654,36 @@ class SafekeeperHttpClient(requests.Session):
@dataclass
class Etcd:
"""An object managing etcd instance"""
class NeonBroker:
"""An object managing storage_broker instance"""
datadir: str
logfile: Path
port: int
peer_port: int
binary_path: Path = field(init=False)
neon_binpath: Path
handle: Optional[subprocess.Popen[Any]] = None # handle of running daemon
def __post_init__(self):
self.binary_path = etcd_path()
def listen_addr(self):
return f"127.0.0.1:{self.port}"
def client_url(self):
return f"http://127.0.0.1:{self.port}"
return f"http://{self.listen_addr()}"
def check_status(self):
with requests.Session() as s:
s.mount("http://", requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"{self.client_url()}/health").raise_for_status()
return True # TODO
def try_start(self):
if self.handle is not None:
log.debug(f"etcd is already running on port {self.port}")
log.debug(f"storage_broker is already running on port {self.port}")
return
Path(self.datadir).mkdir(exist_ok=True)
if not self.binary_path.is_file():
raise RuntimeError(f"etcd broker binary '{self.binary_path}' is not a file")
client_url = self.client_url()
log.info(f'Starting etcd to listen incoming connections at "{client_url}"')
with open(os.path.join(self.datadir, "etcd.log"), "wb") as log_file:
listen_addr = self.listen_addr()
log.info(f'starting storage_broker to listen incoming connections at "{listen_addr}"')
with open(self.logfile, "wb") as logfile:
args = [
self.binary_path,
f"--data-dir={self.datadir}",
f"--listen-client-urls={client_url}",
f"--advertise-client-urls={client_url}",
f"--listen-peer-urls=http://127.0.0.1:{self.peer_port}",
# Set --quota-backend-bytes to keep the etcd virtual memory
# size smaller. Our test etcd clusters are very small.
# See https://github.com/etcd-io/etcd/issues/7910
"--quota-backend-bytes=100000000",
self.neon_binpath / "storage_broker",
f"--listen-addr={listen_addr}",
]
self.handle = subprocess.Popen(args, stdout=log_file, stderr=log_file)
self.handle = subprocess.Popen(args, stdout=logfile, stderr=logfile)
# wait for start
started_at = time.time()
@@ -2707,7 +2693,9 @@ class Etcd:
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 5:
raise RuntimeError(f"timed out waiting {elapsed:.0f}s for etcd start: {e}")
raise RuntimeError(
f"timed out waiting {elapsed:.0f}s for storage_broker start: {e}"
)
time.sleep(0.5)
else:
break # success

View File

@@ -1,7 +1,6 @@
import contextlib
import os
import re
import shutil
import subprocess
import tarfile
import time
@@ -74,13 +73,6 @@ def print_gc_result(row: Dict[str, Any]):
)
def etcd_path() -> Path:
path_output = shutil.which("etcd")
if path_output is None:
raise RuntimeError("etcd not found in PATH")
return Path(path_output)
def query_scalar(cur: cursor, query: str) -> Any:
"""
It is a convenience wrapper to avoid repetitions

View File

@@ -97,17 +97,19 @@ def test_backward_compatibility(
), "COMPATIBILITY_SNAPSHOT_DIR is not set. It should be set to `compatibility_snapshot_pg14` path generateted by test_create_snapshot (ideally generated by the previous version of Neon)"
compatibility_snapshot_dir = Path(compatibility_snapshot_dir_env).resolve()
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
)
breaking_changes_allowed = (
os.environ.get("ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
try:
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
neon_binpath=neon_binpath,
port_distributor=port_distributor,
)
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
@@ -155,18 +157,21 @@ def test_forward_compatibility(
compatibility_snapshot_dir = (
test_output_dir.parent / "test_create_snapshot" / "compatibility_snapshot_pg14"
)
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
pg_distrib_dir=compatibility_postgres_distrib_dir,
)
breaking_changes_allowed = (
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
)
try:
# Copy the snapshot to current directory, and prepare for the test
prepare_snapshot(
from_dir=compatibility_snapshot_dir,
to_dir=test_output_dir / "compatibility_snapshot",
port_distributor=port_distributor,
neon_binpath=compatibility_neon_bin,
pg_distrib_dir=compatibility_postgres_distrib_dir,
)
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
@@ -194,6 +199,7 @@ def prepare_snapshot(
from_dir: Path,
to_dir: Path,
port_distributor: PortDistributor,
neon_binpath: Path,
pg_distrib_dir: Optional[Path] = None,
):
assert from_dir.exists(), f"Snapshot '{from_dir}' doesn't exist"
@@ -227,9 +233,14 @@ def prepare_snapshot(
pageserver_config["listen_pg_addr"] = port_distributor.replace_with_new_port(
pageserver_config["listen_pg_addr"]
)
pageserver_config["broker_endpoints"] = [
port_distributor.replace_with_new_port(ep) for ep in pageserver_config["broker_endpoints"]
]
# since storage_broker these are overriden by neon_local during pageserver
# start; remove both to prevent unknown options during etcd ->
# storage_broker migration. TODO: remove once broker is released
pageserver_config.pop("broker_endpoint", None)
pageserver_config.pop("broker_endpoints", None)
etcd_broker_endpoints = [f"http://localhost:{port_distributor.get_port()}/"]
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
pageserver_config["broker_endpoints"] = etcd_broker_endpoints # old etcd version
if pg_distrib_dir:
pageserver_config["pg_distrib_dir"] = str(pg_distrib_dir)
@@ -239,10 +250,22 @@ def prepare_snapshot(
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["etcd_broker"]["broker_endpoints"] = [
port_distributor.replace_with_new_port(ep)
for ep in snapshot_config["etcd_broker"]["broker_endpoints"]
]
# Provide up/downgrade etcd <-> storage_broker to make forward/backward
# compatibility test happy. TODO: leave only the new part once broker is released.
if get_neon_version(neon_binpath) == "49da498f651b9f3a53b56c7c0697636d880ddfe0":
# old etcd version
snapshot_config["etcd_broker"] = {
"etcd_binary_path": shutil.which("etcd"),
"broker_endpoints": etcd_broker_endpoints,
}
snapshot_config.pop("broker", None)
else:
# new storage_broker version
broker_listen_addr = f"127.0.0.1:{port_distributor.get_port()}"
snapshot_config["broker"] = {"listen_addr": broker_listen_addr}
snapshot_config.pop("etcd_broker", None)
snapshot_config["pageserver"]["listen_http_addr"] = port_distributor.replace_with_new_port(
snapshot_config["pageserver"]["listen_http_addr"]
)
@@ -277,6 +300,12 @@ def prepare_snapshot(
), f"there're files referencing `test_create_snapshot/repo`, this path should be replaced with {repo_dir}:\n{rv.stdout}"
# get git SHA of neon binary
def get_neon_version(neon_binpath: Path):
out = subprocess.check_output([neon_binpath / "neon_local", "--version"]).decode("utf-8")
return out.split("git:", 1)[1].rstrip()
def check_neon_works(
repo_dir: Path,
neon_binpath: Path,

View File

@@ -7,7 +7,7 @@ from typing import Any, Dict, Optional, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Etcd,
NeonBroker,
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
@@ -32,7 +32,7 @@ def new_pageserver_service(
remote_storage_mock_path: Path,
pg_port: int,
http_port: int,
broker: Optional[Etcd],
broker: Optional[NeonBroker],
pg_distrib_dir: Path,
):
"""
@@ -53,7 +53,7 @@ def new_pageserver_service(
]
if broker is not None:
cmd.append(
f"-c broker_endpoints=['{broker.client_url()}']",
f"-c broker_endpoint='{broker.client_url()}'",
)
pageserver_client = PageserverHttpClient(
port=http_port,

View File

@@ -16,7 +16,7 @@ from typing import Any, List, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Etcd,
NeonBroker,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
@@ -520,7 +520,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re
)
# advance remote_consistent_lsn to trigger WAL trimming
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push etcd updates
# this LSN should be less than commit_lsn, so timeline will be active=true in safekeepers, to push broker updates
env.safekeepers[0].http_client().record_safekeeper_info(
tenant_id, timeline_id, {"remote_consistent_lsn": str(offloaded_seg_end)}
)
@@ -812,10 +812,10 @@ class SafekeeperEnv:
):
self.repo_dir = repo_dir
self.port_distributor = port_distributor
self.broker = Etcd(
datadir=os.path.join(self.repo_dir, "etcd"),
self.broker = NeonBroker(
logfile=Path(self.repo_dir) / "storage_broker.log",
port=self.port_distributor.get_port(),
peer_port=self.port_distributor.get_port(),
neon_binpath=neon_binpath,
)
self.pg_bin = pg_bin
self.num_safekeepers = num_safekeepers
@@ -863,7 +863,7 @@ class SafekeeperEnv:
str(safekeeper_dir),
"--id",
str(i),
"--broker-endpoints",
"--broker-endpoint",
self.broker.client_url(),
]
log.info(f'Running command "{" ".join(cmd)}"')

View File

@@ -32,14 +32,14 @@ nom = { version = "7", features = ["alloc", "std"] }
num-bigint = { version = "0.4", features = ["std"] }
num-integer = { version = "0.1", features = ["i128", "std"] }
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] }
prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] }
prost = { version = "0.11", features = ["prost-derive", "std"] }
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
reqwest = { version = "0.11", default-features = false, features = ["__rustls", "__tls", "blocking", "default-tls", "hyper-rustls", "hyper-tls", "json", "native-tls-crate", "rustls", "rustls-pemfile", "rustls-tls", "rustls-tls-webpki-roots", "serde_json", "tokio-native-tls", "tokio-rustls", "webpki-roots"] }
scopeguard = { version = "1", features = ["use_std"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
socket2 = { version = "0.4", default-features = false, features = ["all"] }
stable_deref_trait = { version = "1", features = ["alloc", "std"] }
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }
tokio-util = { version = "0.7", features = ["codec", "io", "io-util", "tracing"] }
@@ -59,8 +59,7 @@ libc = { version = "0.2", features = ["extra_traits", "std"] }
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
memchr = { version = "2", features = ["std"] }
nom = { version = "7", features = ["alloc", "std"] }
prost-93f6ce9d446188ac = { package = "prost", version = "0.10", features = ["prost-derive", "std"] }
prost-a6292c17cd707f01 = { package = "prost", version = "0.11", features = ["prost-derive", "std"] }
prost = { version = "0.11", features = ["prost-derive", "std"] }
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }