Code cleanup

This commit is contained in:
Anastasia Lubennikova
2023-06-27 15:29:33 +03:00
parent a2e154f07b
commit 7357b7cad5
3 changed files with 158 additions and 143 deletions

View File

@@ -42,7 +42,6 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use tokio::runtime::Runtime;
use tracing::{error, info};
use url::Url;
@@ -57,8 +56,6 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::extension_server::get_available_extensions;
const BUILD_TAG_DEFAULT: &str = "local";
fn main() -> Result<()> {
@@ -178,31 +175,10 @@ fn main() -> Result<()> {
let mut new_state = ComputeState::new();
let spec_set;
let tenant_id;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
tenant_id = Some(pspec.tenant_id);
new_state.pspec = Some(pspec);
// fill in list of available extensions
let rt = Runtime::new().unwrap();
if let Some(ref ext_remote_storage) = ext_remote_storage {
new_state.extensions.available_extensions =
rt.block_on(get_available_extensions(&ext_remote_storage, pgbin, None))?;
// append private tenant extensions
let private_ext_list = rt.block_on(get_available_extensions(
&ext_remote_storage,
pgbin,
tenant_id,
))?;
new_state
.extensions
.available_extensions
.extend(private_ext_list);
}
spec_set = true;
} else {
spec_set = false;

View File

@@ -18,13 +18,11 @@ use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::GenericRemoteStorage;
use crate::extension_server::get_available_libraries;
use crate::extension_server::{get_available_extensions, get_available_libraries};
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
use extension_server::ExtensionsState;
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
// Url type maintains proper escaping
@@ -64,7 +62,6 @@ pub struct ComputeState {
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
pub extensions: ExtensionsState,
}
impl ComputeState {
@@ -76,7 +73,6 @@ impl ComputeState {
error: None,
pspec: None,
metrics: ComputeMetrics::default(),
extensions: ExtensionsState::new(),
}
}
}
@@ -520,7 +516,7 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self, extension_server_port: u16) -> Result<std::process::Child> {
let mut compute_state = self.state.lock().unwrap().clone();
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
@@ -530,46 +526,7 @@ impl ComputeNode {
pspec.timeline_id,
);
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
let mut libs_vec = Vec::new();
info!("shared_preload_libraries is set to {:?}", libs_vec);
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(',')
.filter(|s| *s != "neon")
.map(str::to_string)
.collect();
}
// TEST ONLY!
libs_vec.push("test_ext1".to_string());
info!(
"shared_preload_libraries extra settings set to {:?}",
libs_vec
);
// download requested shared_preload_libraries and
// fill in list of available libraries
let rt = tokio::runtime::Runtime::new().unwrap();
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let libs = rt.block_on(get_available_libraries(
&ext_remote_storage,
&self.pgbin,
pspec.tenant_id,
&libs_vec,
))?;
info!("available libs: {:?}", libs);
compute_state.extensions.available_libraries.extend(libs);
info!(
"cache available libraries: {:?}",
compute_state.extensions.available_libraries
);
}
self.prepare_external_extensions(&compute_state)?;
self.prepare_pgdata(&compute_state, extension_server_port)?;
@@ -708,17 +665,71 @@ LIMIT 100",
}
}
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let available_extensions = state.extensions.available_extensions;
// If remote extension storage is configured,
// download extension control files
// and shared preload libraries.
pub fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
// download preload shared libraries before postgres start (if any)
let spec = &pspec.spec;
// 1. parse private extension paths from spec
// TODO
let mut private_ext_prefixes = Vec::new();
if let Some(tenant_id) = spec.tenant_id {
private_ext_prefixes.push(tenant_id.to_string());
}
// 2. parse shared_preload_libraries from spec
let mut libs_vec = Vec::new();
info!("shared_preload_libraries is set to {:?}", libs_vec);
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
libs_vec = libs
.split(',')
.filter(|s| *s != "neon")
.map(str::to_string)
.collect();
}
// TODO write a proper test for this
libs_vec.push("test_ext1".to_string());
info!(
"shared_preload_libraries extra settings set to {:?}",
libs_vec
);
// download extension control files & shared_preload_libraries
let rt = tokio::runtime::Runtime::new().unwrap();
let pgbin = self.pgbin.clone();
rt.block_on(async move {
get_available_extensions(ext_remote_storage, &pgbin, &private_ext_prefixes).await?;
get_available_libraries(
ext_remote_storage,
&pgbin,
&private_ext_prefixes,
&libs_vec,
)
.await?;
Ok::<(), anyhow::Error>(())
})?;
}
Ok(())
}
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_sql_files(
&filename,
&remote_storage,
&available_extensions,
remote_storage,
&self.pgbin,
)
.await
@@ -727,19 +738,11 @@ LIMIT 100",
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let available_libraries = state.extensions.available_libraries;
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(
&filename,
&available_libraries,
&remote_storage,
&self.pgbin,
)
.await
extension_server::download_library_file(&filename, remote_storage, &self.pgbin)
.await
}
}
}

View File

@@ -8,22 +8,6 @@ use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
use utils::id::TenantId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExtensionsState {
pub available_extensions: Vec<RemotePath>,
pub available_libraries: Vec<RemotePath>,
}
impl ExtensionsState {
pub fn new() -> Self {
ExtensionsState {
available_extensions: Vec::new(),
available_libraries: Vec::new(),
}
}
}
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -76,44 +60,57 @@ async fn download_helper(
// download extension control files
//
// return list of all extension files to use it in the future searches
//
// if tenant_id is provided - search in a private per-tenant extension path,
// otherwise - in public extension path
// if private_ext_prefixes is provided - search also in private extension paths
//
pub async fn get_available_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
tenant_id: Option<TenantId>,
) -> anyhow::Result<Vec<RemotePath>> {
private_ext_prefixes: &Vec<String>,
) -> anyhow::Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
let remote_sharedir = match tenant_id {
None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?,
Some(tenant_id) => RemotePath::new(
// &Path::new(&pg_version)
// .join(&tenant_id.to_string())
// .join("share/postgresql/extension"),
Path::new(&tenant_id.to_string()),
)?,
};
// 1. Download public extension control files
let remote_sharedir =
RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?;
let from_paths: Vec<RemotePath> = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!(
"get_available_extensions remote_sharedir: {:?}, local_sharedir: {:?}",
remote_sharedir, local_sharedir
);
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
// download all found control files
for remote_from_path in &from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
download_helper(remote_storage, remote_from_path, &local_sharedir).await?;
}
}
Ok(from_paths)
// 2. Download private extension control files
for private_prefix in private_ext_prefixes {
let remote_sharedir_private = RemotePath::new(
&Path::new(&pg_version)
.join(private_prefix)
.join("share/postgresql/extension"),
)?;
let from_paths_private: Vec<RemotePath> = remote_storage
.list_files(Some(&remote_sharedir_private))
.await?;
info!(
"get_available_extensions remote_sharedir_private: {:?}, local_sharedir: {:?}",
remote_sharedir_private, local_sharedir
);
// download all found private control files
for remote_from_path in &from_paths_private {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, remote_from_path, &local_sharedir).await?;
}
}
}
Ok(())
}
// Download requested shared_preload_libraries
@@ -125,22 +122,21 @@ pub async fn get_available_extensions(
pub async fn get_available_libraries(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
_tenant_id: TenantId,
private_ext_prefixes: &Vec<String>,
preload_libraries: &Vec<String>,
) -> anyhow::Result<Vec<RemotePath>> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
) -> anyhow::Result<()> {
// Return early if there are no libraries to download
if preload_libraries.is_empty() {
return Ok(());
}
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
let pg_version = get_pg_version(pgbin);
let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
// 1. Download public libraries
let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?;
// TODO list private libraries as well
//
// let remote_libdir_private = RemotePath::new(&Path::new(&pg_version).join(tenant_id.to_string()).join("lib/")).unwrap();
// let available_libraries_private = remote_storage.list_files(Some(&remote_libdir_private)).await?;
// available_libraries.extend(available_libraries_private);
info!("list of library files {:?}", &available_libraries);
// download all requested libraries
@@ -163,15 +159,57 @@ pub async fn get_available_libraries(
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name_with_ext);
match lib_path {
// TODO don't panic here,
// remember error and return it only if library is not found in any prefix
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib_path) => {
download_helper(remote_storage, &lib_path, &local_libdir).await?;
download_helper(remote_storage, lib_path, &local_libdir).await?;
info!("downloaded library {:?}", &lib_path);
}
}
}
return Ok(available_libraries);
// 2. Download private libraries
for private_prefix in private_ext_prefixes {
let remote_libdir_private =
RemotePath::new(&Path::new(&pg_version).join(private_prefix).join("lib/")).unwrap();
let available_libraries_private = remote_storage
.list_files(Some(&remote_libdir_private))
.await?;
info!("list of library files {:?}", &available_libraries_private);
// download all requested libraries
// add file extension if it isn't in the filename
//
// TODO refactor this code to avoid duplication
for lib_name in preload_libraries {
let lib_name_with_ext = if !lib_name.ends_with(".so") {
lib_name.to_owned() + ".so"
} else {
lib_name.to_string()
};
info!("looking for library {:?}", &lib_name_with_ext);
for lib in available_libraries_private.iter() {
info!("object_name {}", lib.object_name().unwrap());
}
let lib_path = available_libraries_private
.iter()
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name_with_ext);
match lib_path {
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib_path) => {
download_helper(remote_storage, lib_path, &local_libdir).await?;
info!("downloaded library {:?}", &lib_path);
}
}
}
}
Ok(())
}
// download all sql files for a given extension name
@@ -179,11 +217,15 @@ pub async fn get_available_libraries(
pub async fn download_extension_sql_files(
ext_name: &str,
remote_storage: &GenericRemoteStorage,
available_extensions: &Vec<RemotePath>,
pgbin: &str,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
let remote_sharedir: RemotePath =
RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?;
let available_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!(
"list of available_extension files {:?}",
&available_extensions
@@ -202,7 +244,7 @@ pub async fn download_extension_sql_files(
}
for remote_from_path in files_to_download {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
download_helper(remote_storage, remote_from_path, &local_sharedir).await?;
}
Ok(())
@@ -211,7 +253,6 @@ pub async fn download_extension_sql_files(
// download shared library file
pub async fn download_library_file(
lib_name: &str,
available_libraries: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
@@ -220,11 +261,6 @@ pub async fn download_library_file(
let pg_version = get_pg_version(pgbin);
let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
info!(
"cached list of available_libraries files {:?}",
&available_libraries
);
// TODO cache available_libraries list on the first read to avoid unneeded s3 calls
let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?;
info!("list of library files {:?}", &available_libraries);
@@ -237,7 +273,7 @@ pub async fn download_library_file(
match lib {
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib) => {
download_helper(remote_storage, &lib, &local_libdir).await?;
download_helper(remote_storage, lib, &local_libdir).await?;
}
}