mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
refactor
This commit is contained in:
@@ -42,6 +42,7 @@ use std::{thread, time::Duration};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Arg;
|
||||
use tokio::runtime::Runtime;
|
||||
use tracing::{error, info};
|
||||
use url::Url;
|
||||
|
||||
@@ -49,14 +50,13 @@ use compute_api::responses::ComputeStatus;
|
||||
|
||||
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
|
||||
use compute_tools::configurator::launch_configurator;
|
||||
use compute_tools::extension_server::{get_availiable_extensions, init_remote_storage};
|
||||
use compute_tools::extension_server::init_remote_storage;
|
||||
use compute_tools::http::api::launch_http_server;
|
||||
use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
|
||||
use tokio::runtime::Runtime;
|
||||
const BUILD_TAG_DEFAULT: &str = "local";
|
||||
|
||||
fn main() -> Result<()> {
|
||||
@@ -76,13 +76,6 @@ fn main() -> Result<()> {
|
||||
None => None,
|
||||
};
|
||||
|
||||
// let rt0 = Runtime::new().unwrap();
|
||||
// rt0.block_on(async {
|
||||
// download_extension(&ext_remote_storage, ExtensionType::Shared, pgbin)
|
||||
// .await
|
||||
// .expect("download shared extensions should work");
|
||||
// });
|
||||
|
||||
let http_port = *matches
|
||||
.get_one::<u16>("http-port")
|
||||
.expect("http-port is required");
|
||||
@@ -186,7 +179,7 @@ fn main() -> Result<()> {
|
||||
let tenant_id;
|
||||
if let Some(spec) = spec {
|
||||
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
|
||||
tenant_id = Some(pspec.tenant_id.to_string());
|
||||
tenant_id = Some(pspec.tenant_id);
|
||||
new_state.pspec = Some(pspec);
|
||||
spec_set = true;
|
||||
} else {
|
||||
@@ -214,17 +207,14 @@ fn main() -> Result<()> {
|
||||
let extension_server_port: u16 = http_port;
|
||||
|
||||
// exen before we have spec, we can get public availiable extensions
|
||||
// TODO maybe convert get_availiable_extensions into ComputeNode method as well as other functions
|
||||
// TODO maybe convert get_available_extensions into ComputeNode method as well as other functions
|
||||
let rt = Runtime::new().unwrap();
|
||||
let copy_remote_storage = compute.ext_remote_storage.clone();
|
||||
|
||||
if let Some(remote_storage) = copy_remote_storage {
|
||||
rt.block_on(async move {
|
||||
get_availiable_extensions(&remote_storage, pgbin, None)
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
}
|
||||
rt.block_on(async {
|
||||
compute
|
||||
.get_available_extensions(None)
|
||||
.await
|
||||
.context("get_avilable_extensions error")
|
||||
})?;
|
||||
|
||||
if !spec_set {
|
||||
// No spec provided, hang waiting for it.
|
||||
@@ -241,17 +231,6 @@ fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Now we have the spec, so we request the tenant specific extensions
|
||||
// TODO: this is temporarily disabled
|
||||
// if let Some(tenant_id) = tenant_id {
|
||||
// let rt1 = Runtime::new().unwrap();
|
||||
// rt1.block_on(async {
|
||||
// download_extension(&ext_remote_storage, ExtensionType::Tenant(tenant_id), pgbin)
|
||||
// .await
|
||||
// .expect("download tenant specific extensions should not return an error");
|
||||
// });
|
||||
// }
|
||||
|
||||
// We got all we need, update the state.
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
|
||||
@@ -275,11 +254,16 @@ fn main() -> Result<()> {
|
||||
launch_configurator(&compute).expect("cannot launch configurator thread");
|
||||
|
||||
// download private tenant extensions before postgres start
|
||||
// TODO
|
||||
// compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage,
|
||||
// pg_version, //TODO
|
||||
// pgbin,
|
||||
// tenant_id); //TODO get tenant_id from spec
|
||||
// TODO (see Alek's attempt to do this below)
|
||||
// compute_node.available_extensions = get_available_extensions(ext_remote_storage,pg_version, pgbin,tenant_id);
|
||||
if tenant_id.is_some() {
|
||||
rt.block_on(async {
|
||||
compute
|
||||
.get_available_extensions(tenant_id)
|
||||
.await
|
||||
.context("get_available_extensions with tenant_id error")
|
||||
})?;
|
||||
}
|
||||
|
||||
// download preload shared libraries before postgres start (if any)
|
||||
// TODO
|
||||
|
||||
@@ -605,4 +605,12 @@ LIMIT 100",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_available_extensions(&self, tenant_id: Option<TenantId>) -> Result<()> {
|
||||
if let Some(remote_storage) = self.ext_remote_storage.as_ref() {
|
||||
extension_server::get_available_extensions(remote_storage, &self.pgbin, tenant_id)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn download_helper(
|
||||
// if tenant_id is provided - search in a private per-tenant extension path,
|
||||
// otherwise - in public extension path
|
||||
//
|
||||
pub async fn get_availiable_extensions(
|
||||
pub async fn get_available_extensions(
|
||||
remote_storage: &GenericRemoteStorage,
|
||||
pgbin: &str,
|
||||
tenant_id: Option<TenantId>,
|
||||
|
||||
Reference in New Issue
Block a user