diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index fe08c192d5..490df8aef2 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -57,6 +57,8 @@ use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::spec::*; +use compute_tools::extension_server::get_available_extensions; + const BUILD_TAG_DEFAULT: &str = "local"; fn main() -> Result<()> { @@ -181,6 +183,24 @@ fn main() -> Result<()> { let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; tenant_id = Some(pspec.tenant_id); new_state.pspec = Some(pspec); + + // fill in list of available extensions + let rt = Runtime::new().unwrap(); + + if let Some(ref ext_remote_storage) = ext_remote_storage { + new_state.extensions.available_extensions = + rt.block_on(get_available_extensions(&ext_remote_storage, pgbin, None))?; + } + + // append private tenant extensions + // TODO not implemented yet + // let private_ext_list = rt.block_on(get_available_extensions( + // &ext_remote_storage, + // pgbin, + // tenant_id, + // ))?; + // new_state.extensions.available_extensions.extend(private_ext_list); + spec_set = true; } else { spec_set = false; @@ -194,8 +214,6 @@ fn main() -> Result<()> { state: Mutex::new(new_state), state_changed: Condvar::new(), ext_remote_storage, - availiable_extensions: Vec::new(), - availiable_libraries: Vec::new(), }; let compute = Arc::new(compute_node); @@ -206,17 +224,6 @@ fn main() -> Result<()> { let extension_server_port: u16 = http_port; - // exen before we have spec, we can get public availiable extensions - // TODO maybe convert get_available_extensions into ComputeNode method as well as other functions - let rt = Runtime::new().unwrap(); - rt.block_on(async { - compute - .get_available_extensions(None) - .await - .context("get_avilable_extensions error") - })?; - drop(rt); - if !spec_set { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -254,22 +261,6 @@ fn main() -> Result<()> { let _configurator_handle = launch_configurator(&compute).expect("cannot launch configurator thread"); - // download private tenant extensions before postgres start - // TODO (see Alek's attempt to do this below) - // compute_node.available_extensions = get_available_extensions(ext_remote_storage,pg_version, pgbin,tenant_id); - // if tenant_id.is_some() { - // rt.block_on(async { - // compute - // .get_available_extensions(tenant_id) - // .await - // .context("get_available_extensions with tenant_id error") - // })?; - // } - - // download preload shared libraries before postgres start (if any) - // TODO - // download_library_file(); - // Start Postgres let mut delay_exit = false; let mut exit_code = None; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index af29a9ee51..d9f8db81b1 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -16,12 +16,15 @@ use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeMode, ComputeSpec}; -use remote_storage::{GenericRemoteStorage, RemotePath}; +use remote_storage::GenericRemoteStorage; +use crate::extension_server::get_available_libraries; use crate::pg_helpers::*; use crate::spec::*; use crate::{config, extension_server}; +use extension_server::ExtensionsState; + /// Compute node info shared across several `compute_ctl` threads. pub struct ComputeNode { // Url type maintains proper escaping @@ -49,8 +52,6 @@ pub struct ComputeNode { pub state_changed: Condvar, /// S3 extensions configuration variables pub ext_remote_storage: Option, - pub availiable_extensions: Vec, - pub availiable_libraries: Vec, } #[derive(Clone, Debug)] @@ -63,6 +64,7 @@ pub struct ComputeState { pub error: Option, pub pspec: Option, pub metrics: ComputeMetrics, + pub extensions: ExtensionsState, } impl ComputeState { @@ -74,6 +76,7 @@ impl ComputeState { error: None, pspec: None, metrics: ComputeMetrics::default(), + extensions: ExtensionsState::new(), } } } @@ -435,7 +438,7 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self, extension_server_port: u16) -> Result { - let compute_state = self.state.lock().unwrap().clone(); + let mut compute_state = self.state.lock().unwrap().clone(); let pspec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", @@ -445,6 +448,35 @@ impl ComputeNode { pspec.timeline_id, ); + // download preload shared libraries before postgres start (if any) + let spec = &pspec.spec; + let mut libs_vec = Vec::new(); + + if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { + libs_vec = libs + .split(',') + .filter(|s| *s != "neon") + .map(str::to_string) + .collect(); + } + + info!("shared_preload_libraries is set to {:?}", libs_vec); + + // download requested shared_preload_libraries and + // fill in list of available libraries + let rt = tokio::runtime::Runtime::new().unwrap(); + + if let Some(ref ext_remote_storage) = self.ext_remote_storage { + let libs = rt.block_on(get_available_libraries( + &ext_remote_storage, + &self.pgbin, + pspec.tenant_id, + &libs_vec, + ))?; + + compute_state.extensions.available_libraries.extend(libs); + } + self.prepare_pgdata(&compute_state, extension_server_port)?; let start_time = Utc::now(); @@ -583,12 +615,16 @@ LIMIT 100", } pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> { + let state = self.state.lock().unwrap().clone(); + let available_extensions = state.extensions.available_extensions; + match &self.ext_remote_storage { None => anyhow::bail!("No remote extension storage"), Some(remote_storage) => { extension_server::download_extension_sql_files( &filename, &remote_storage, + &available_extensions, &self.pgbin, ) .await @@ -597,20 +633,20 @@ LIMIT 100", } pub async fn download_library_file(&self, filename: String) -> Result<()> { + let state = self.state.lock().unwrap().clone(); + let available_libraries = state.extensions.available_libraries; + match &self.ext_remote_storage { None => anyhow::bail!("No remote extension storage"), Some(remote_storage) => { - extension_server::download_library_file(&filename, &remote_storage, &self.pgbin) - .await + extension_server::download_library_file( + &filename, + &available_libraries, + &remote_storage, + &self.pgbin, + ) + .await } } } - - pub async fn get_available_extensions(&self, tenant_id: Option) -> Result<()> { - if let Some(remote_storage) = self.ext_remote_storage.as_ref() { - extension_server::get_available_extensions(remote_storage, &self.pgbin, tenant_id) - .await?; - } - Ok(()) - } } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index f4860a18a7..f85bb34f1c 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -10,6 +10,21 @@ use tokio::io::AsyncReadExt; use tracing::info; use utils::id::TenantId; +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ExtensionsState { + pub available_extensions: Vec, + pub available_libraries: Vec, +} + +impl ExtensionsState { + pub fn new() -> Self { + ExtensionsState { + available_extensions: Vec::new(), + available_libraries: Vec::new(), + } + } +} + fn get_pg_config(argument: &str, pgbin: &str) -> String { // gives the result of `pg_config [argument]` // where argument is a flag like `--version` or `--sharedir` @@ -84,7 +99,7 @@ pub async fn get_available_extensions( }; info!( - "get_availiable_extensions remote_sharedir: {:?}, local_sharedir: {:?}", + "get_available_extensions remote_sharedir: {:?}, local_sharedir: {:?}", remote_sharedir, local_sharedir ); @@ -100,30 +115,68 @@ pub async fn get_available_extensions( Ok(from_paths) } +// Download requested shared_preload_libraries +// +// Note that tenant_id is not optional here, because we only download libraries +// after we know the tenant spec and the tenant_id. +// +// return list of all library files to use it in the future searches +pub async fn get_available_libraries( + remote_storage: &GenericRemoteStorage, + pgbin: &str, + _tenant_id: TenantId, + preload_libraries: &Vec, +) -> anyhow::Result> { + let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); + + let pg_version = get_pg_version(pgbin); + let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap(); + + let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?; + + // TODO list private libraries as well + // + // let remote_libdir_private = RemotePath::new(&Path::new(&pg_version).join(tenant_id.to_string()).join("lib/")).unwrap(); + // let available_libraries_private = remote_storage.list_files(Some(&remote_libdir_private)).await?; + // available_libraries.extend(available_libraries_private); + + info!("list of library files {:?}", &available_libraries); + + // download all requested libraries + for lib_name in preload_libraries { + let lib_path = available_libraries + .iter() + .find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name); + + match lib_path { + None => bail!("Shared library file {lib_name} is not found in the extension store"), + Some(lib_path) => { + download_helper(remote_storage, &lib_path, &local_libdir).await?; + info!("downloaded library {:?}", &lib_path); + } + } + } + + return Ok(available_libraries); +} + // download all sql files for a given extension name // pub async fn download_extension_sql_files( ext_name: &str, - //availiable_extensions: &Vec, remote_storage: &GenericRemoteStorage, + available_extensions: &Vec, pgbin: &str, ) -> Result<()> { let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - let pg_version = get_pg_version(pgbin); - let remote_sharedir = - RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension")).unwrap(); - - // TODO cache availiable_extensions list on the first read to avoid unneeded s3 calls - let availiable_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?; - info!( - "list of availiable_extension files {:?}", - &availiable_extensions + "list of available_extension files {:?}", + &available_extensions ); // check if extension files exist - let files_to_download: Vec<&RemotePath> = availiable_extensions + let files_to_download: Vec<&RemotePath> = available_extensions .iter() .filter(|ext| { ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name) @@ -144,22 +197,26 @@ pub async fn download_extension_sql_files( // download shared library file pub async fn download_library_file( lib_name: &str, - // availiable_libraries: &Vec, + available_libraries: &Vec, remote_storage: &GenericRemoteStorage, pgbin: &str, ) -> Result<()> { let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into(); let pg_version = get_pg_version(pgbin); - let remote_sharedir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap(); + let remote_libdir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap(); - // TODO cache availiable_libraries list on the first read to avoid unneeded s3 calls - let availiable_libraries = remote_storage.list_files(Some(&remote_sharedir)).await?; + info!( + "cached list of available_libraries files {:?}", + &available_libraries + ); + // TODO cache available_libraries list on the first read to avoid unneeded s3 calls + let available_libraries = remote_storage.list_files(Some(&remote_libdir)).await?; - info!("list of library files {:?}", &availiable_libraries); + info!("list of library files {:?}", &available_libraries); // check if the library file exists - let lib = availiable_libraries + let lib = available_libraries .iter() .find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name); diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 264312f01c..2f1f98f187 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -16,8 +16,6 @@ use tokio::task; use tracing::{error, info}; use tracing_utils::http::OtelName; -use crate::extension_server::{download_extension_sql_files, download_library_file}; - fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { start_time: state.start_time, diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index dbecc05360..31f66b0fe0 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -311,7 +311,7 @@ impl Endpoint { // TODO: use future host field from safekeeper spec // Pass the list of safekeepers to the replica so that it can connect to any of them, - // whichever is availiable. + // whichever is available. let sk_ports = self .env .safekeepers diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index b07221188c..ea1ffa298b 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -42,7 +42,7 @@ relocatable = true""" return output -@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.REAL_S3]) +@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.MOCK_S3]) def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind): """ Tests we can download a file @@ -66,7 +66,6 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re assert env.ext_remote_storage is not None assert env.remote_storage_client is not None - NUM_EXT = 5 PUB_EXT_ROOT = "v14/share/postgresql/extension" BUCKET_PREFIX = "5314225671" # this is the build number cleanup_files = [] @@ -79,8 +78,8 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re public_remote_name = f"{BUCKET_PREFIX}/{PUB_EXT_ROOT}/test_ext{i}.control" public_local_name = f"pg_install/{PUB_EXT_ROOT}/test_ext{i}.control" # private extensions - private_ext = BytesIO(bytes(ext_contents(str(tenant_id), i), "utf-8")) - private_remote_name = f"{BUCKET_PREFIX}/{str(tenant_id)}/private_ext{i}.control" + BytesIO(bytes(ext_contents(str(tenant_id), i), "utf-8")) + f"{BUCKET_PREFIX}/{str(tenant_id)}/private_ext{i}.control" private_local_name = f"pg_install/{PUB_EXT_ROOT}/private_ext{i}.control" cleanup_files += [public_local_name, private_local_name] @@ -158,20 +157,20 @@ def test_file_download(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re assert f"test_ext{i}" in all_extensions # assert f"private_ext{i}" in all_extensions - # cur.execute("CREATE EXTENSION test_ext0") - # cur.execute("SELECT extname FROM pg_extension") - # all_extensions = [x[0] for x in cur.fetchall()] - # log.info(all_extensions) - # assert "test_ext0" in all_extensions + cur.execute("CREATE EXTENSION test_ext0") + cur.execute("SELECT extname FROM pg_extension") + all_extensions = [x[0] for x in cur.fetchall()] + log.info(all_extensions) + assert "test_ext0" in all_extensions - # try: - # cur.execute("LOAD 'test_ext0.so'") - # except Exception as e: - # # expected to fail with - # # could not load library ... test_ext.so: file too short - # # because test_ext.so is not real library file - # log.info("LOAD test_ext0.so failed (expectedly): %s", e) - # assert "file too short" in str(e) + try: + cur.execute("LOAD 'test_ext0.so'") + except Exception as e: + # expected to fail with + # could not load library ... test_ext.so: file too short + # because test_ext.so is not real library file + log.info("LOAD test_ext0.so failed (expectedly): %s", e) + assert "file too short" in str(e) # TODO add more test cases: # - try to load non-existing library