diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 708aab4a56..7fbfcad256 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -49,7 +49,9 @@ use compute_api::responses::ComputeStatus; use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::configurator::launch_configurator; -use compute_tools::extension_server::{download_extension, init_remote_storage, ExtensionType}; +use compute_tools::extension_server::{ + download_extension, get_availiable_extensions, init_remote_storage, +}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -78,11 +80,12 @@ fn main() -> Result<()> { let rt = Runtime::new().unwrap(); let copy_remote_storage = ext_remote_storage.clone(); - rt.block_on(async move { - download_extension(©_remote_storage, ExtensionType::Shared, pgbin) - .await - .expect("download extension should work"); - }); + + // rt.block_on(async move { + // download_extension(©_remote_storage, ExtensionType::Shared, pgbin) + // .await + // .expect("download extension should work"); + // }); let http_port = *matches .get_one::("http-port") @@ -199,6 +202,8 @@ fn main() -> Result<()> { state: Mutex::new(new_state), state_changed: Condvar::new(), ext_remote_storage, + availiable_extensions: Vec::new(), + availiable_libraries: Vec::new(), }; let compute = Arc::new(compute_node); @@ -209,6 +214,19 @@ fn main() -> Result<()> { let extension_server_port: u16 = http_port; + // exen before we have spec, we can get public availiable extensions + // TODO turn get_availiable_extensions() & other functions into ComputeNode method, + // we pass to many params from it anyways.. + + compute_node.availiable_extensions = get_availiable_extensions( + ext_remote_storage, + pg_version, //TODO + pgbin, + None, + ); + + // TODO same for libraries + if !spec_set { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -246,6 +264,17 @@ fn main() -> Result<()> { let _configurator_handle = launch_configurator(&compute).expect("cannot launch configurator thread"); + // download private tenant extensions before postgres start + // TODO + // compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage, + // pg_version, //TODO + // pgbin, + // tenant_id); //TODO get tenant_id from spec + + // download preload shared libraries before postgres start (if any) + // TODO + // download_library_file(); + // Start Postgres let mut delay_exit = false; let mut exit_code = None; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 8cf030f722..605cb17410 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -16,7 +16,7 @@ use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, RemotePath}; use crate::config; use crate::pg_helpers::*; @@ -49,6 +49,8 @@ pub struct ComputeNode { pub state_changed: Condvar, /// S3 extensions configuration variables pub ext_remote_storage: Option, + pub availiable_extensions: Vec, + pub availiable_libraries: Vec, } #[derive(Clone, Debug)] diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index ae2b129ac8..9fcb7d74d9 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -1,4 +1,4 @@ -use anyhow::{self, bail}; +use anyhow::{self, bail, Result}; use remote_storage::*; use serde_json::{self, Value}; use std::fs::File; @@ -8,6 +8,7 @@ use std::path::{Path, PathBuf}; use std::str; use tokio::io::AsyncReadExt; use tracing::info; +use utils::id::TenantId; fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` @@ -55,60 +56,92 @@ async fn download_helper( Ok(()) } -pub enum ExtensionType { - Shared, // we just use the public folder here - Tenant(String), // String is tenant_id - Library(String), // String is name of the extension -} - -pub async fn download_extension( - remote_storage: &Option, - ext_type: ExtensionType, +// download extension control files +// +// return list of all extension files to use it in the future searches +// +// if tenant_id is provided - search in a private per-tenant extension path, +// otherwise - in public extension path +// +pub async fn get_availiable_extensions( + remote_storage: &GenericRemoteStorage, + pg_version: &str, pgbin: &str, -) -> anyhow::Result<()> { - let remote_storage = match remote_storage { - Some(remote_storage) => remote_storage, - None => return Ok(()), + tenant_id: Option, +) -> anyhow::Result> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + + let remote_sharedir = match tenant_id { + None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?, + Some(tenant_id) => RemotePath::new( + &Path::new(&pg_version) + .join(&tenant_id.to_string()) + .join("share/postgresql/extension"), + )?, }; - let pg_version = get_pg_version(pgbin); - match ext_type { - ExtensionType::Shared => { - // 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension - // We can do this step even before we have spec, - // because public extensions are common for all projects. - let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let remote_sharedir = Path::new(&pg_version).join("share/postgresql/extension"); - let remote_sharedir = RemotePath::new(Path::new(&remote_sharedir))?; - let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?; - for remote_from_path in from_paths { - if remote_from_path.extension() == Some("control") { - download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; - } - } - } - ExtensionType::Tenant(tenant_id) => { - // 2. After we have spec, before project start - // Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension - let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let remote_path = RemotePath::new(Path::new(&tenant_id.to_string()))?; - let from_paths = remote_storage.list_files(Some(&remote_path)).await?; - for remote_from_path in from_paths { - if remote_from_path.extension() == Some("control") { - download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; - } - } - } - ExtensionType::Library(library_name) => { - // 3. After we have spec, before postgres start - // Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/ + let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?; - let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into(); - let remote_path = format!("{library_name}.control"); - let remote_from_path = RemotePath::new(Path::new(&remote_path))?; - download_helper(remote_storage, &remote_from_path, &local_libdir).await?; + // download all found control files + for remote_from_path in &from_paths { + if remote_from_path.extension() == Some("control") { + download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; } } + + Ok(from_paths) +} + +// download all sql files for a given extension name +// +pub async fn download_extension_sql_files( + ext_name: &str, + availiable_extensions: Vec, + remote_storage: &GenericRemoteStorage, + pgbin: &str, +) -> Result<()> { + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + + // check if extension files exist + let files_to_download: Vec<&RemotePath> = availiable_extensions + .iter() + .filter(|ext| { + ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name) + }) + .collect(); + + if files_to_download.is_empty() { + bail!("Files for extension {ext_name} are not found in the extension store"); + } + + for remote_from_path in files_to_download { + download_helper(remote_storage, &remote_from_path, &local_sharedir).await?; + } + + Ok(()) +} + +// download shared library file +pub async fn download_library_file( + lib_name: &str, + availiable_libraries: Vec, + remote_storage: &GenericRemoteStorage, + pgbin: &str, +) -> Result<()> { + let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into(); + + // check if the library file exists + let lib = availiable_libraries + .iter() + .find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name); + + match lib { + None => bail!("Shared library file {lib_name} is not found in the extension store"), + Some(lib) => { + download_helper(remote_storage, &lib, &local_libdir).await?; + } + } + Ok(()) } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index cffdbdbc4f..f7383d8062 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -16,7 +16,7 @@ use tokio::task; use tracing::{error, info}; use tracing_utils::http::OtelName; -use crate::extension_server::{self, ExtensionType}; +use crate::extension_server::{download_extension_sql_files, download_library_file}; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { @@ -127,6 +127,8 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving {:?} POST request", route); + let is_library = false; + let filename = route.split('/').last().unwrap(); info!( @@ -134,17 +136,47 @@ async fn routes(req: Request, compute: &Arc) -> Response Response::new(Body::from("OK")), - Err(e) => { - error!("download_extension failed: {}", e); - Response::new(Body::from(e.to_string())) + if compute.ext_remote_storage.is_none() { + error!("Remote extension storage is not set up"); + let mut resp = Response::new(Body::from("Remote extension storage is not set up")); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + return resp; + } + let ext_storage = &compute.ext_remote_storage.unwrap(); + + if !is_library { + match download_extension_sql_files( + filename, + &compute.availiable_extensions, + &ext_storage, + &compute.pgbin, + ) + .await + { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("extension download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } + } + } else { + match download_library_file( + filename, + &compute.availiable_libraries, + &ext_storage, + &compute.pgbin, + ) + .await + { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("library download failed: {}", e); + let mut resp = Response::new(Body::from(e.to_string())); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + resp + } } } }