From 7357b7cad5b1ff0caee868554d23064ae7284941 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Tue, 27 Jun 2023 15:29:33 +0300 Subject: [PATCH] Code cleanup --- compute_tools/src/bin/compute_ctl.rs | 26 +---- compute_tools/src/compute.rs | 125 ++++++++++----------- compute_tools/src/extension_server.rs | 150 ++++++++++++++++---------- 3 files changed, 158 insertions(+), 143 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index b468f915b9..bcc8ec7844 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -42,7 +42,6 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; -use tokio::runtime::Runtime; use tracing::{error, info}; use url::Url; @@ -57,8 +56,6 @@ use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::spec::*; -use compute_tools::extension_server::get_available_extensions; - const BUILD_TAG_DEFAULT: &str = "local"; fn main() -> Result<()> { @@ -178,31 +175,10 @@ fn main() -> Result<()> { let mut new_state = ComputeState::new(); let spec_set; - let tenant_id; + if let Some(spec) = spec { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - tenant_id = Some(pspec.tenant_id); new_state.pspec = Some(pspec); - - // fill in list of available extensions - let rt = Runtime::new().unwrap(); - - if let Some(ref ext_remote_storage) = ext_remote_storage { - new_state.extensions.available_extensions = - rt.block_on(get_available_extensions(&ext_remote_storage, pgbin, None))?; - - // append private tenant extensions - let private_ext_list = rt.block_on(get_available_extensions( - &ext_remote_storage, - pgbin, - tenant_id, - ))?; - new_state - .extensions - .available_extensions - .extend(private_ext_list); - } - spec_set = true; } else { spec_set = false; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index d9e6b5c905..a6d12df172 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -18,13 +18,11 @@ use compute_api::spec::{ComputeMode, ComputeSpec}; use remote_storage::GenericRemoteStorage; -use crate::extension_server::get_available_libraries; +use crate::extension_server::{get_available_extensions, get_available_libraries}; use crate::pg_helpers::*; use crate::spec::*; use crate::{config, extension_server}; -use extension_server::ExtensionsState; - /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { // Url type maintains proper escaping @@ -64,7 +62,6 @@ pub struct ComputeState { pub error: Option, pub pspec: Option, pub metrics: ComputeMetrics, - pub extensions: ExtensionsState, } impl ComputeState { @@ -76,7 +73,6 @@ impl ComputeState { error: None, pspec: None, metrics: ComputeMetrics::default(), - extensions: ExtensionsState::new(), } } } @@ -520,7 +516,7 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self, extension_server_port: u16) -> Result { - let mut compute_state = self.state.lock().unwrap().clone(); + let compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", @@ -530,46 +526,7 @@ impl ComputeNode { pspec.timeline_id, ); - // download preload shared libraries before postgres start (if any) - let spec = &pspec.spec; - let mut libs_vec = Vec::new(); - - info!("shared_preload_libraries is set to {:?}", libs_vec); - - if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { - libs_vec = libs - .split(',') - .filter(|s| *s != "neon") - .map(str::to_string) - .collect(); - } - - // TEST ONLY! - libs_vec.push("test_ext1".to_string()); - info!( - "shared_preload_libraries extra settings set to {:?}", - libs_vec - ); - - // download requested shared_preload_libraries and - // fill in list of available libraries - let rt = tokio::runtime::Runtime::new().unwrap(); - - if let Some(ref ext_remote_storage) = self.ext_remote_storage { - let libs = rt.block_on(get_available_libraries( - &ext_remote_storage, - &self.pgbin, - pspec.tenant_id, - &libs_vec, - ))?; - - info!("available libs: {:?}", libs); - compute_state.extensions.available_libraries.extend(libs); - info!( - "cache available libraries: {:?}", - compute_state.extensions.available_libraries - ); - } + self.prepare_external_extensions(&compute_state)?; self.prepare_pgdata(&compute_state, extension_server_port)?; @@ -708,17 +665,71 @@ LIMIT 100", } } - pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> { - let state = self.state.lock().unwrap().clone(); - let available_extensions = state.extensions.available_extensions; + // If remote extension storage is configured, + // download extension control files + // and shared preload libraries. + pub fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> { + if let Some(ref ext_remote_storage) = self.ext_remote_storage { + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + // download preload shared libraries before postgres start (if any) + let spec = &pspec.spec; + // 1. parse private extension paths from spec + // TODO + let mut private_ext_prefixes = Vec::new(); + + if let Some(tenant_id) = spec.tenant_id { + private_ext_prefixes.push(tenant_id.to_string()); + } + + // 2. parse shared_preload_libraries from spec + let mut libs_vec = Vec::new(); + info!("shared_preload_libraries is set to {:?}", libs_vec); + + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + libs_vec = libs + .split(',') + .filter(|s| *s != "neon") + .map(str::to_string) + .collect(); + } + + // TODO write a proper test for this + libs_vec.push("test_ext1".to_string()); + info!( + "shared_preload_libraries extra settings set to {:?}", + libs_vec + ); + + // download extension control files & shared_preload_libraries + let rt = tokio::runtime::Runtime::new().unwrap(); + + let pgbin = self.pgbin.clone(); + rt.block_on(async move { + get_available_extensions(ext_remote_storage, &pgbin, &private_ext_prefixes).await?; + + get_available_libraries( + ext_remote_storage, + &pgbin, + &private_ext_prefixes, + &libs_vec, + ) + .await?; + + Ok::<(), anyhow::Error>(()) + })?; + } + + Ok(()) + } + + pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> { match &self.ext_remote_storage { None => anyhow::bail!("No remote extension storage"), Some(remote_storage) => { extension_server::download_extension_sql_files( &filename, - &remote_storage, - &available_extensions, + remote_storage, &self.pgbin, ) .await @@ -727,19 +738,11 @@ LIMIT 100", } pub async fn download_library_file(&self, filename: String) -> Result<()> { - let state = self.state.lock().unwrap().clone(); - let available_libraries = state.extensions.available_libraries; - match &self.ext_remote_storage { None => anyhow::bail!("No remote extension storage"), Some(remote_storage) => { - extension_server::download_library_file( - &filename, - &available_libraries, - &remote_storage, - &self.pgbin, - ) - .await + extension_server::download_library_file(&filename, remote_storage, &self.pgbin) + .await } } } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index f73cca27fc..cb9f318214 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -8,22 +8,6 @@ use std::path::{Path, PathBuf}; use std::str; use tokio::io::AsyncReadExt; use tracing::info; -use utils::id::TenantId; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ExtensionsState { - pub available_extensions: Vec, - pub available_libraries: Vec, -} - -impl ExtensionsState { - pub fn new() -> Self { - ExtensionsState { - available_extensions: Vec::new(), - available_libraries: Vec::new(), - } - } -} fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` @@ -76,44 +60,57 @@ async fn download_helper( // download extension control files // -// return list of all extension files to use it in the future searches -// -// if tenant_id is provided - search in a private per-tenant extension path, -// otherwise - in public extension path +// if private_ext_prefixes is provided - search also in private extension paths // pub async fn get_available_extensions( remote_storage: &GenericRemoteStorage, pgbin: &str, - tenant_id: Option, -) -> anyhow::Result> { + private_ext_prefixes: &Vec, +) -> anyhow::Result<()> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); let pg_version = get_pg_version(pgbin); - let remote_sharedir = match tenant_id { - None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?, - Some(tenant_id) => RemotePath::new( - // &Path::new(&pg_version) - // .join(&tenant_id.to_string()) - // .join("share/postgresql/extension"), - Path::new(&tenant_id.to_string()), - )?, - }; + // 1. Download public extension control files + let remote_sharedir = + RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?; + let from_paths: Vec = remote_storage.list_files(Some(&remote_sharedir)).await?; info!( "get_available_extensions remote_sharedir: {:?}, local_sharedir: {:?}", remote_sharedir, local_sharedir ); - let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?; - - // download all found control files for remote_from_path in &from_paths { if remote_from_path.extension() == Some("control") { - download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; + download_helper(remote_storage, remote_from_path, &local_sharedir).await?; } } - Ok(from_paths) + // 2. Download private extension control files + for private_prefix in private_ext_prefixes { + let remote_sharedir_private = RemotePath::new( + &Path::new(&pg_version) + .join(private_prefix) + .join("share/postgresql/extension"), + )?; + let from_paths_private: Vec = remote_storage + .list_files(Some(&remote_sharedir_private)) + .await?; + + info!( + "get_available_extensions remote_sharedir_private: {:?}, local_sharedir: {:?}", + remote_sharedir_private, local_sharedir + ); + + // download all found private control files + for remote_from_path in &from_paths_private { + if remote_from_path.extension() == Some("control") { + download_helper(remote_storage, remote_from_path, &local_sharedir).await?; + } + } + } + + Ok(()) } // Download requested shared_preload_libraries @@ -125,22 +122,21 @@ pub async fn get_available_extensions( pub async fn get_available_libraries( remote_storage: &GenericRemoteStorage, pgbin: &str, - _tenant_id: TenantId, + private_ext_prefixes: &Vec, preload_libraries: &Vec, -) -> anyhow::Result> { - let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); +) -> anyhow::Result<()> { + // Return early if there are no libraries to download + if preload_libraries.is_empty() { + return Ok(()); + } + let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); let pg_version = get_pg_version(pgbin); let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap(); + // 1. Download public libraries + let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?; - - // TODO list private libraries as well - // - // let remote_libdir_private = RemotePath::new(&Path::new(&pg_version).join(tenant_id.to_string()).join("lib/")).unwrap(); - // let available_libraries_private = remote_storage.list_files(Some(&remote_libdir_private)).await?; - // available_libraries.extend(available_libraries_private); - info!("list of library files {:?}", &available_libraries); // download all requested libraries @@ -163,15 +159,57 @@ pub async fn get_available_libraries( .find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name_with_ext); match lib_path { + // TODO don't panic here, + // remember error and return it only if library is not found in any prefix None => bail!("Shared library file {lib_name} is not found in the extension store"), Some(lib_path) => { - download_helper(remote_storage, &lib_path, &local_libdir).await?; + download_helper(remote_storage, lib_path, &local_libdir).await?; info!("downloaded library {:?}", &lib_path); } } } - return Ok(available_libraries); + // 2. Download private libraries + for private_prefix in private_ext_prefixes { + let remote_libdir_private = + RemotePath::new(&Path::new(&pg_version).join(private_prefix).join("lib/")).unwrap(); + let available_libraries_private = remote_storage + .list_files(Some(&remote_libdir_private)) + .await?; + info!("list of library files {:?}", &available_libraries_private); + + // download all requested libraries + // add file extension if it isn't in the filename + // + // TODO refactor this code to avoid duplication + for lib_name in preload_libraries { + let lib_name_with_ext = if !lib_name.ends_with(".so") { + lib_name.to_owned() + ".so" + } else { + lib_name.to_string() + }; + + info!("looking for library {:?}", &lib_name_with_ext); + + for lib in available_libraries_private.iter() { + info!("object_name {}", lib.object_name().unwrap()); + } + + let lib_path = available_libraries_private + .iter() + .find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name_with_ext); + + match lib_path { + None => bail!("Shared library file {lib_name} is not found in the extension store"), + Some(lib_path) => { + download_helper(remote_storage, lib_path, &local_libdir).await?; + info!("downloaded library {:?}", &lib_path); + } + } + } + } + + Ok(()) } // download all sql files for a given extension name @@ -179,11 +217,15 @@ pub async fn get_available_libraries( pub async fn download_extension_sql_files( ext_name: &str, remote_storage: &GenericRemoteStorage, - available_extensions: &Vec, pgbin: &str, ) -> Result<()> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + let pg_version = get_pg_version(pgbin); + let remote_sharedir: RemotePath = + RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?; + let available_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?; + info!( "list of available_extension files {:?}", &available_extensions @@ -202,7 +244,7 @@ pub async fn download_extension_sql_files( } for remote_from_path in files_to_download { - download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; + download_helper(remote_storage, remote_from_path, &local_sharedir).await?; } Ok(()) @@ -211,7 +253,6 @@ pub async fn download_extension_sql_files( // download shared library file pub async fn download_library_file( lib_name: &str, - available_libraries: &Vec, remote_storage: &GenericRemoteStorage, pgbin: &str, ) -> Result<()> { @@ -220,11 +261,6 @@ pub async fn download_library_file( let pg_version = get_pg_version(pgbin); let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap(); - info!( - "cached list of available_libraries files {:?}", - &available_libraries - ); - // TODO cache available_libraries list on the first read to avoid unneeded s3 calls let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?; info!("list of library files {:?}", &available_libraries); @@ -237,7 +273,7 @@ pub async fn download_library_file( match lib { None => bail!("Shared library file {lib_name} is not found in the extension store"), Some(lib) => { - download_helper(remote_storage, &lib, &local_libdir).await?; + download_helper(remote_storage, lib, &local_libdir).await?; } }