mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 00:50:36 +00:00
eliminate one workaround, convert sk stuff to async as well
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1245,6 +1245,7 @@ name = "control_plane"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"camino",
|
||||
"clap",
|
||||
"comfy-table",
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
comfy-table.workspace = true
|
||||
|
||||
@@ -64,7 +64,7 @@ impl AttachmentService {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub fn start(&self) -> anyhow::Result<Child> {
|
||||
pub async fn start(&self) -> anyhow::Result<Child> {
|
||||
let path_str = self.path.to_string_lossy();
|
||||
|
||||
background_process::start_process(
|
||||
@@ -73,10 +73,11 @@ impl AttachmentService {
|
||||
&self.env.attachment_service_bin(),
|
||||
["-l", &self.listen, "-p", &path_str],
|
||||
[],
|
||||
background_process::InitialPidFile::Create(&self.pid_file()),
|
||||
background_process::InitialPidFile::Create(self.pid_file()),
|
||||
// TODO: a real status check
|
||||
|| Ok(true),
|
||||
|| async move { anyhow::Ok(true) },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn stop(&self, immediate: bool) -> anyhow::Result<()> {
|
||||
|
||||
@@ -44,15 +44,15 @@ const NOTICE_AFTER_RETRIES: u64 = 50;
|
||||
|
||||
/// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
|
||||
/// it itself.
|
||||
pub enum InitialPidFile<'t> {
|
||||
pub enum InitialPidFile {
|
||||
/// Create a pidfile, to allow future CLI invocations to manipulate the process.
|
||||
Create(&'t Utf8Path),
|
||||
Create(Utf8PathBuf),
|
||||
/// The process will create the pidfile itself, need to wait for that event.
|
||||
Expect(&'t Utf8Path),
|
||||
Expect(Utf8PathBuf),
|
||||
}
|
||||
|
||||
/// Start a background child process using the parameters given.
|
||||
pub fn start_process<F, AI, A, EI>(
|
||||
pub async fn start_process<F, Fut, AI, A, EI>(
|
||||
process_name: &str,
|
||||
datadir: &Path,
|
||||
command: &Path,
|
||||
@@ -62,7 +62,8 @@ pub fn start_process<F, AI, A, EI>(
|
||||
process_status_check: F,
|
||||
) -> anyhow::Result<Child>
|
||||
where
|
||||
F: Fn() -> anyhow::Result<bool>,
|
||||
F: Fn() -> Fut,
|
||||
Fut: std::future::Future<Output = anyhow::Result<bool>>,
|
||||
AI: IntoIterator<Item = A>,
|
||||
A: AsRef<OsStr>,
|
||||
// Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
|
||||
@@ -89,7 +90,7 @@ where
|
||||
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
|
||||
filled_cmd.envs(envs);
|
||||
|
||||
let pid_file_to_check = match initial_pid_file {
|
||||
let pid_file_to_check = match &initial_pid_file {
|
||||
InitialPidFile::Create(path) => {
|
||||
pre_exec_create_pidfile(filled_cmd, path);
|
||||
path
|
||||
@@ -107,7 +108,7 @@ where
|
||||
);
|
||||
|
||||
for retries in 0..RETRIES {
|
||||
match process_started(pid, Some(pid_file_to_check), &process_status_check) {
|
||||
match process_started(pid, &pid_file_to_check, &process_status_check).await {
|
||||
Ok(true) => {
|
||||
println!("\n{process_name} started, pid: {pid}");
|
||||
return Ok(spawned_process);
|
||||
@@ -316,22 +317,20 @@ where
|
||||
cmd
|
||||
}
|
||||
|
||||
fn process_started<F>(
|
||||
async fn process_started<F, Fut>(
|
||||
pid: Pid,
|
||||
pid_file_to_check: Option<&Utf8Path>,
|
||||
pid_file_to_check: &Utf8Path,
|
||||
status_check: &F,
|
||||
) -> anyhow::Result<bool>
|
||||
where
|
||||
F: Fn() -> anyhow::Result<bool>,
|
||||
F: Fn() -> Fut,
|
||||
Fut: std::future::Future<Output = anyhow::Result<bool>>,
|
||||
{
|
||||
match status_check() {
|
||||
Ok(true) => match pid_file_to_check {
|
||||
Some(pid_file_path) => match pid_file::read(pid_file_path)? {
|
||||
PidFileRead::NotExist => Ok(false),
|
||||
PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
|
||||
PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
|
||||
},
|
||||
None => Ok(true),
|
||||
match status_check().await {
|
||||
Ok(true) => match pid_file::read(pid_file_to_check)? {
|
||||
PidFileRead::NotExist => Ok(false),
|
||||
PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
|
||||
PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
|
||||
},
|
||||
Ok(false) => Ok(false),
|
||||
Err(e) => anyhow::bail!("process failed to start: {e}"),
|
||||
|
||||
@@ -128,11 +128,11 @@ fn main() -> Result<()> {
|
||||
let subcommand_result = match sub_name {
|
||||
"tenant" => rt.block_on(handle_tenant(sub_args, &mut env)),
|
||||
"timeline" => rt.block_on(handle_timeline(sub_args, &mut env)),
|
||||
"start" => handle_start_all(sub_args, &env),
|
||||
"start" => rt.block_on(handle_start_all(sub_args, &env)),
|
||||
"stop" => handle_stop_all(sub_args, &env),
|
||||
"pageserver" => rt.block_on(handle_pageserver(sub_args, &env)),
|
||||
"attachment_service" => handle_attachment_service(sub_args, &env),
|
||||
"safekeeper" => handle_safekeeper(sub_args, &env),
|
||||
"attachment_service" => rt.block_on(handle_attachment_service(sub_args, &env)),
|
||||
"safekeeper" => rt.block_on(handle_safekeeper(sub_args, &env)),
|
||||
"endpoint" => rt.block_on(handle_endpoint(sub_args, &env)),
|
||||
"mappings" => handle_mappings(sub_args, &mut env),
|
||||
"pg" => bail!("'pg' subcommand has been renamed to 'endpoint'"),
|
||||
@@ -560,7 +560,9 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
|
||||
|
||||
let mut cplane = ComputeControlPlane::load(env.clone())?;
|
||||
println!("Importing timeline into pageserver ...");
|
||||
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version).await?;
|
||||
pageserver
|
||||
.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)
|
||||
.await?;
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
|
||||
println!("Creating endpoint for imported timeline ...");
|
||||
@@ -904,6 +906,7 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
Some(("start", subcommand_args)) => {
|
||||
if let Err(e) = get_pageserver(env, subcommand_args)?
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
exit(1);
|
||||
@@ -930,7 +933,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
@@ -944,7 +950,10 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(subcommand_args)) {
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(subcommand_args))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
@@ -966,11 +975,14 @@ async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_attachment_service(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
async fn handle_attachment_service(
|
||||
sub_match: &ArgMatches,
|
||||
env: &local_env::LocalEnv,
|
||||
) -> Result<()> {
|
||||
let svc = AttachmentService::from_env(env);
|
||||
match sub_match.subcommand() {
|
||||
Some(("start", _start_match)) => {
|
||||
if let Err(e) = svc.start() {
|
||||
if let Err(e) = svc.start().await {
|
||||
eprintln!("start failed: {e}");
|
||||
exit(1);
|
||||
}
|
||||
@@ -1011,7 +1023,7 @@ fn safekeeper_extra_opts(init_match: &ArgMatches) -> Vec<String> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
async fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
|
||||
let (sub_name, sub_args) = match sub_match.subcommand() {
|
||||
Some(safekeeper_command_data) => safekeeper_command_data,
|
||||
None => bail!("no safekeeper subcommand provided"),
|
||||
@@ -1029,7 +1041,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
"start" => {
|
||||
let extra_opts = safekeeper_extra_opts(sub_args);
|
||||
|
||||
if let Err(e) = safekeeper.start(extra_opts) {
|
||||
if let Err(e) = safekeeper.start(extra_opts).await {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
@@ -1055,7 +1067,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
}
|
||||
|
||||
let extra_opts = safekeeper_extra_opts(sub_args);
|
||||
if let Err(e) = safekeeper.start(extra_opts) {
|
||||
if let Err(e) = safekeeper.start(extra_opts).await {
|
||||
eprintln!("safekeeper start failed: {}", e);
|
||||
exit(1);
|
||||
}
|
||||
@@ -1068,15 +1080,15 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
async fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
// Endpoints are not started automatically
|
||||
|
||||
broker::start_broker_process(env)?;
|
||||
broker::start_broker_process(env).await?;
|
||||
|
||||
// Only start the attachment service if the pageserver is configured to need it
|
||||
if env.control_plane_api.is_some() {
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
if let Err(e) = attachment_service.start() {
|
||||
if let Err(e) = attachment_service.start().await {
|
||||
eprintln!("attachment_service start failed: {:#}", e);
|
||||
try_stop_all(env, true);
|
||||
exit(1);
|
||||
@@ -1085,7 +1097,10 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
|
||||
|
||||
for ps_conf in &env.pageservers {
|
||||
let pageserver = PageServerNode::from_env(env, ps_conf);
|
||||
if let Err(e) = pageserver.start(&pageserver_config_overrides(sub_match)) {
|
||||
if let Err(e) = pageserver
|
||||
.start(&pageserver_config_overrides(sub_match))
|
||||
.await
|
||||
{
|
||||
eprintln!("pageserver {} start failed: {:#}", ps_conf.id, e);
|
||||
try_stop_all(env, true);
|
||||
exit(1);
|
||||
@@ -1094,7 +1109,7 @@ fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow
|
||||
|
||||
for node in env.safekeepers.iter() {
|
||||
let safekeeper = SafekeeperNode::from_env(env, node);
|
||||
if let Err(e) = safekeeper.start(vec![]) {
|
||||
if let Err(e) = safekeeper.start(vec![]).await {
|
||||
eprintln!("safekeeper {} start failed: {:#}", safekeeper.id, e);
|
||||
try_stop_all(env, false);
|
||||
exit(1);
|
||||
|
||||
@@ -11,7 +11,7 @@ use camino::Utf8PathBuf;
|
||||
|
||||
use crate::{background_process, local_env};
|
||||
|
||||
pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
pub async fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
let broker = &env.broker;
|
||||
let listen_addr = &broker.listen_addr;
|
||||
|
||||
@@ -26,8 +26,8 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
&env.storage_broker_bin(),
|
||||
args,
|
||||
[],
|
||||
background_process::InitialPidFile::Create(&storage_broker_pid_file_path(env)),
|
||||
|| {
|
||||
background_process::InitialPidFile::Create(storage_broker_pid_file_path(env)),
|
||||
|| async {
|
||||
let url = broker.client_url();
|
||||
let status_url = url.join("status").with_context(|| {
|
||||
format!("Failed to append /status path to broker endpoint {url}")
|
||||
@@ -42,6 +42,7 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("Failed to spawn storage_broker subprocess")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -149,8 +149,8 @@ impl PageServerNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
|
||||
self.start_node(config_overrides, false)
|
||||
pub async fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {
|
||||
self.start_node(config_overrides, false).await
|
||||
}
|
||||
|
||||
fn pageserver_init(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
|
||||
@@ -191,53 +191,52 @@ impl PageServerNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start_node(&self, config_overrides: &[&str], update_config: bool) -> anyhow::Result<Child> {
|
||||
async fn start_node(
|
||||
&self,
|
||||
config_overrides: &[&str],
|
||||
update_config: bool,
|
||||
) -> anyhow::Result<Child> {
|
||||
// TODO: using a thread here because start_process() is not async but we need to call check_status()
|
||||
std::thread::scope(move |s| {
|
||||
s.spawn(move || {
|
||||
let datadir = self.repo_path();
|
||||
print!(
|
||||
"Starting pageserver node {} at '{}' in {:?}",
|
||||
self.conf.id,
|
||||
self.pg_connection_config.raw_address(),
|
||||
datadir
|
||||
);
|
||||
io::stdout().flush().context("flush stdout")?;
|
||||
let datadir = self.repo_path();
|
||||
print!(
|
||||
"Starting pageserver node {} at '{}' in {:?}",
|
||||
self.conf.id,
|
||||
self.pg_connection_config.raw_address(),
|
||||
datadir
|
||||
);
|
||||
io::stdout().flush().context("flush stdout")?;
|
||||
|
||||
let datadir_path_str = datadir.to_str().with_context(|| {
|
||||
format!(
|
||||
"Cannot start pageserver node {} in path that has no string representation: {:?}",
|
||||
self.conf.id, datadir,
|
||||
)
|
||||
})?;
|
||||
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
|
||||
if update_config {
|
||||
args.push(Cow::Borrowed("--update-config"));
|
||||
let datadir_path_str = datadir.to_str().with_context(|| {
|
||||
format!(
|
||||
"Cannot start pageserver node {} in path that has no string representation: {:?}",
|
||||
self.conf.id, datadir,
|
||||
)
|
||||
})?;
|
||||
let mut args = self.pageserver_basic_args(config_overrides, datadir_path_str);
|
||||
if update_config {
|
||||
args.push(Cow::Borrowed("--update-config"));
|
||||
}
|
||||
background_process::start_process(
|
||||
"pageserver",
|
||||
&datadir,
|
||||
&self.env.pageserver_bin(),
|
||||
args.iter().map(Cow::as_ref),
|
||||
self.pageserver_env_variables()?,
|
||||
background_process::InitialPidFile::Expect(self.pid_file()),
|
||||
|| async {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let st = rt.block_on(self.check_status());
|
||||
match st {
|
||||
Ok(()) => Ok(true),
|
||||
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||
}
|
||||
background_process::start_process(
|
||||
"pageserver",
|
||||
&datadir,
|
||||
&self.env.pageserver_bin(),
|
||||
args.iter().map(Cow::as_ref),
|
||||
self.pageserver_env_variables()?,
|
||||
background_process::InitialPidFile::Expect(&self.pid_file()),
|
||||
|| {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
let st = rt.block_on(self.check_status());
|
||||
match st {
|
||||
Ok(()) => Ok(true),
|
||||
Err(mgmt_api::Error::ReceiveBody(_)) => Ok(false),
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||
}
|
||||
},
|
||||
)
|
||||
})
|
||||
.join()
|
||||
.unwrap()
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn pageserver_basic_args<'a>(
|
||||
|
||||
@@ -13,7 +13,6 @@ use std::{io, result};
|
||||
use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use utils::{http::error::HttpErrorBody, id::NodeId};
|
||||
@@ -34,12 +33,14 @@ pub enum SafekeeperHttpError {
|
||||
|
||||
type Result<T> = result::Result<T, SafekeeperHttpError>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait ResponseErrorMessageExt: Sized {
|
||||
fn error_from_body(self) -> Result<Self>;
|
||||
async fn error_from_body(self) -> Result<Self>;
|
||||
}
|
||||
|
||||
impl ResponseErrorMessageExt for Response {
|
||||
fn error_from_body(self) -> Result<Self> {
|
||||
#[async_trait::async_trait]
|
||||
impl ResponseErrorMessageExt for reqwest::Response {
|
||||
async fn error_from_body(self) -> Result<Self> {
|
||||
let status = self.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
return Ok(self);
|
||||
@@ -48,7 +49,7 @@ impl ResponseErrorMessageExt for Response {
|
||||
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
|
||||
let url = self.url().to_owned();
|
||||
Err(SafekeeperHttpError::Response(
|
||||
match self.json::<HttpErrorBody>() {
|
||||
match self.json::<HttpErrorBody>().await {
|
||||
Ok(err_body) => format!("Error: {}", err_body.msg),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
},
|
||||
@@ -69,7 +70,7 @@ pub struct SafekeeperNode {
|
||||
|
||||
pub pg_connection_config: PgConnectionConfig,
|
||||
pub env: LocalEnv,
|
||||
pub http_client: Client,
|
||||
pub http_client: reqwest::Client,
|
||||
pub http_base_url: String,
|
||||
}
|
||||
|
||||
@@ -80,7 +81,7 @@ impl SafekeeperNode {
|
||||
conf: conf.clone(),
|
||||
pg_connection_config: Self::safekeeper_connection_config(conf.pg_port),
|
||||
env: env.clone(),
|
||||
http_client: Client::new(),
|
||||
http_client: reqwest::Client::new(),
|
||||
http_base_url: format!("http://127.0.0.1:{}/v1", conf.http_port),
|
||||
}
|
||||
}
|
||||
@@ -103,7 +104,7 @@ impl SafekeeperNode {
|
||||
.expect("non-Unicode path")
|
||||
}
|
||||
|
||||
pub fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<Child> {
|
||||
pub async fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<Child> {
|
||||
print!(
|
||||
"Starting safekeeper at '{}' in '{}'",
|
||||
self.pg_connection_config.raw_address(),
|
||||
@@ -191,13 +192,16 @@ impl SafekeeperNode {
|
||||
&self.env.safekeeper_bin(),
|
||||
&args,
|
||||
[],
|
||||
background_process::InitialPidFile::Expect(&self.pid_file()),
|
||||
|| match self.check_status() {
|
||||
Ok(()) => Ok(true),
|
||||
Err(SafekeeperHttpError::Transport(_)) => Ok(false),
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||
background_process::InitialPidFile::Expect(self.pid_file()),
|
||||
|| async {
|
||||
match self.check_status().await {
|
||||
Ok(()) => Ok(true),
|
||||
Err(SafekeeperHttpError::Transport(_)) => Ok(false),
|
||||
Err(e) => Err(anyhow::anyhow!("Failed to check node status: {e}")),
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
///
|
||||
@@ -216,7 +220,7 @@ impl SafekeeperNode {
|
||||
)
|
||||
}
|
||||
|
||||
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
|
||||
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> reqwest::RequestBuilder {
|
||||
// TODO: authentication
|
||||
//if self.env.auth_type == AuthType::NeonJWT {
|
||||
// builder = builder.bearer_auth(&self.env.safekeeper_auth_token)
|
||||
@@ -224,10 +228,12 @@ impl SafekeeperNode {
|
||||
self.http_client.request(method, url)
|
||||
}
|
||||
|
||||
pub fn check_status(&self) -> Result<()> {
|
||||
pub async fn check_status(&self) -> Result<()> {
|
||||
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
|
||||
.send()?
|
||||
.error_from_body()?;
|
||||
.send()
|
||||
.await?
|
||||
.error_from_body()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user