Fix downloading of sql files for extension and libraries.

Rust code refactoring and C code fixes.

Add test for CREATE EXTENSION and LOAD 'library'
This commit is contained in:
Anastasia Lubennikova
2023-06-23 16:11:43 +03:00
parent 4201f4695f
commit 7cdcc8a500
8 changed files with 274 additions and 97 deletions

View File

@@ -49,7 +49,7 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::{download_extension, init_remote_storage, ExtensionType};
use compute_tools::extension_server::{get_availiable_extensions, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -76,13 +76,6 @@ fn main() -> Result<()> {
None => None,
};
let rt = Runtime::new().unwrap();
rt.block_on(async {
download_extension(&ext_remote_storage, ExtensionType::Shared, pgbin)
.await
.expect("download shared extensions should work");
});
let http_port = *matches
.get_one::<u16>("http-port")
.expect("http-port is required");
@@ -201,7 +194,9 @@ fn main() -> Result<()> {
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage: ext_remote_storage.clone(),
ext_remote_storage,
availiable_extensions: Vec::new(),
availiable_libraries: Vec::new(),
};
let compute = Arc::new(compute_node);
@@ -212,6 +207,19 @@ fn main() -> Result<()> {
let extension_server_port: u16 = http_port;
// exen before we have spec, we can get public availiable extensions
// TODO maybe convert get_availiable_extensions into ComputeNode method as well as other functions
let rt = Runtime::new().unwrap();
let copy_remote_storage = compute.ext_remote_storage.clone();
if let Some(remote_storage) = copy_remote_storage {
rt.block_on(async move {
get_availiable_extensions(&remote_storage, pgbin, None)
.await
.unwrap();
});
}
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -259,6 +267,17 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// download private tenant extensions before postgres start
// TODO
// compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage,
// pg_version, //TODO
// pgbin,
// tenant_id); //TODO get tenant_id from spec
// download preload shared libraries before postgres start (if any)
// TODO
// download_library_file();
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;

View File

@@ -16,11 +16,11 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::config;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::{config, extension_server};
/// Compute node info shared across several `compute_ctl` threads.
pub struct ComputeNode {
@@ -49,6 +49,8 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// S3 extensions configuration variables
pub ext_remote_storage: Option<GenericRemoteStorage>,
pub availiable_extensions: Vec<RemotePath>,
pub availiable_libraries: Vec<RemotePath>,
}
#[derive(Clone, Debug)]
@@ -579,4 +581,28 @@ LIMIT 100",
"{{\"pg_stat_statements\": []}}".to_string()
}
}
pub async fn download_extension_sql_files(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_extension_sql_files(
&filename,
&remote_storage,
&self.pgbin,
)
.await
}
}
}
pub async fn download_library_file(&self, filename: String) -> Result<()> {
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
extension_server::download_library_file(&filename, &remote_storage, &self.pgbin)
.await
}
}
}
}

View File

@@ -1,4 +1,4 @@
use anyhow::{self, bail};
use anyhow::{self, bail, Result};
use remote_storage::*;
use serde_json::{self, Value};
use std::fs::File;
@@ -8,6 +8,7 @@ use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
use utils::id::TenantId;
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -57,60 +58,117 @@ async fn download_helper(
Ok(())
}
pub enum ExtensionType {
Shared, // we just use the public folder here
Tenant(String), // String is tenant_id
Library(String), // String is name of the extension
}
pub async fn download_extension(
remote_storage: &Option<GenericRemoteStorage>,
ext_type: ExtensionType,
// download extension control files
//
// return list of all extension files to use it in the future searches
//
// if tenant_id is provided - search in a private per-tenant extension path,
// otherwise - in public extension path
//
pub async fn get_availiable_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> anyhow::Result<()> {
let remote_storage = match remote_storage {
Some(remote_storage) => remote_storage,
None => return Ok(()),
};
tenant_id: Option<TenantId>,
) -> anyhow::Result<Vec<RemotePath>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
match ext_type {
ExtensionType::Shared => {
// 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension
// We can do this step even before we have spec,
// because public extensions are common for all projects.
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_sharedir = Path::new(&pg_version).join("share/postgresql/extension");
let remote_sharedir = RemotePath::new(Path::new(&remote_sharedir))?;
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
}
ExtensionType::Tenant(tenant_id) => {
// 2. After we have spec, before project start
// Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_path = RemotePath::new(Path::new(&tenant_id.to_string()))?;
let from_paths = remote_storage.list_files(Some(&remote_path)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
}
ExtensionType::Library(library_name) => {
// 3. After we have spec, before postgres start
// Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/
let remote_sharedir = match tenant_id {
None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?,
Some(tenant_id) => RemotePath::new(
&Path::new(&pg_version)
.join(&tenant_id.to_string())
.join("share/postgresql/extension"),
)?,
};
let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into();
let remote_path = format!("{library_name}.control");
let remote_from_path = RemotePath::new(Path::new(&remote_path))?;
download_helper(remote_storage, &remote_from_path, &local_libdir).await?;
info!(
"get_availiable_extensions remote_sharedir: {:?}, local_sharedir: {:?}",
remote_sharedir, local_sharedir
);
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
// download all found control files
for remote_from_path in &from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
Ok(from_paths)
}
// download all sql files for a given extension name
//
pub async fn download_extension_sql_files(
ext_name: &str,
//availiable_extensions: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let pg_version = get_pg_version(pgbin);
let remote_sharedir =
RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension")).unwrap();
// TODO cache availiable_extensions list on the first read to avoid unneeded s3 calls
let availiable_extensions = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!(
"list of availiable_extension files {:?}",
&availiable_extensions
);
// check if extension files exist
let files_to_download: Vec<&RemotePath> = availiable_extensions
.iter()
.filter(|ext| {
ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name)
})
.collect();
if files_to_download.is_empty() {
bail!("Files for extension {ext_name} are not found in the extension store");
}
for remote_from_path in files_to_download {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
Ok(())
}
// download shared library file
pub async fn download_library_file(
lib_name: &str,
// availiable_libraries: &Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--pkglibdir", pgbin)).into();
let pg_version = get_pg_version(pgbin);
let remote_sharedir = RemotePath::new(&Path::new(&pg_version).join("lib/")).unwrap();
// TODO cache availiable_libraries list on the first read to avoid unneeded s3 calls
let availiable_libraries = remote_storage.list_files(Some(&remote_sharedir)).await?;
info!("list of library files {:?}", &availiable_libraries);
// check if the library file exists
let lib = availiable_libraries
.iter()
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);
match lib {
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib) => {
download_helper(remote_storage, &lib, &local_libdir).await?;
}
}
Ok(())
}

View File

@@ -16,7 +16,7 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
use crate::extension_server::{self, ExtensionType};
use crate::extension_server::{download_extension_sql_files, download_library_file};
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
@@ -126,25 +126,51 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
info!("req.uri {:?}", req.uri());
let mut is_library = false;
if let Some(params) = req.uri().query() {
info!("serving {:?} POST request with params: {}", route, params);
if params == "is_library=true" {
is_library = true;
} else {
let mut resp = Response::new(Body::from("Wrong request parameters"));
*resp.status_mut() = StatusCode::BAD_REQUEST;
return resp;
}
}
let filename = route.split('/').last().unwrap().to_string();
info!(
"serving /extension_server POST request, filename: {:?}",
&filename
"serving /extension_server POST request, filename: {:?} is_library: {}",
filename, is_library
);
match extension_server::download_extension(
&compute.ext_remote_storage,
ExtensionType::Library(filename),
&compute.pgbin,
)
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("download_extension failed: {}", e);
Response::new(Body::from(e.to_string()))
if is_library {
match compute.download_library_file(filename.to_string()).await {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("library download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
} else {
match compute
.download_extension_sql_files(filename.to_string())
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
}
}