diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 97fa45062b..c0e28790d6 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -1,5 +1,4 @@ -use std::collections::{HashMap, HashSet}; -use std::iter::once; +use std::collections::HashMap; use std::os::unix::fs::{PermissionsExt, symlink}; use std::path::Path; use std::process::{Command, Stdio}; @@ -13,9 +12,7 @@ use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use compute_api::privilege::Privilege; use compute_api::responses::{ComputeMetrics, ComputeStatus}; -use compute_api::spec::{ - ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role, -}; +use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent}; use futures::StreamExt; use futures::future::join_all; use futures::stream::FuturesUnordered; @@ -34,16 +31,6 @@ use utils::measured_stream::MeasuredReader; use crate::installed_extensions::get_installed_extensions; use crate::pg_helpers::*; use crate::spec::*; -use crate::spec_apply::ApplySpecPhase::{ - CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon, - CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, - HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, - RunInEachDatabase, -}; -use crate::spec_apply::PerDatabasePhase::{ - ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension, -}; -use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations}; use crate::sync_sk::{check_if_synced, ping_safekeeper}; use crate::{config, extension_server, local_proxy}; @@ -928,388 +915,6 @@ impl ComputeNode { Ok(client) } - /// Apply the spec to the running PostgreSQL instance. - /// The caller can decide to run with multiple clients in parallel, or - /// single mode. Either way, the commands executed will be the same, and - /// only commands run in different databases are parallelized. - #[instrument(skip_all)] - pub fn apply_spec_sql( - &self, - spec: Arc, - conf: Arc, - concurrency: usize, - ) -> Result<()> { - info!("Applying config with max {} concurrency", concurrency); - debug!("Config: {:?}", spec); - - let rt = tokio::runtime::Handle::current(); - rt.block_on(async { - // Proceed with post-startup configuration. Note, that order of operations is important. - let client = Self::get_maintenance_client(&conf).await?; - let spec = spec.clone(); - - let databases = get_existing_dbs_async(&client).await?; - let roles = get_existing_roles_async(&client) - .await? - .into_iter() - .map(|role| (role.name.clone(), role)) - .collect::>(); - - // Check if we need to drop subscriptions before starting the endpoint. - // - // It is important to do this operation exactly once when endpoint starts on a new branch. - // Otherwise, we may drop not inherited, but newly created subscriptions. - // - // We cannot rely only on spec.drop_subscriptions_before_start flag, - // because if for some reason compute restarts inside VM, - // it will start again with the same spec and flag value. - // - // To handle this, we save the fact of the operation in the database - // in the neon.drop_subscriptions_done table. - // If the table does not exist, we assume that the operation was never performed, so we must do it. - // If table exists, we check if the operation was performed on the current timelilne. - // - let mut drop_subscriptions_done = false; - - if spec.drop_subscriptions_before_start { - let timeline_id = self.get_timeline_id().context("timeline_id must be set")?; - let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id); - - info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id); - - drop_subscriptions_done = match - client.simple_query(&query).await { - Ok(result) => { - matches!(&result[0], postgres::SimpleQueryMessage::Row(_)) - }, - Err(e) => - { - match e.code() { - Some(&SqlState::UNDEFINED_TABLE) => false, - _ => { - // We don't expect any other error here, except for the schema/table not existing - error!("Error checking if drop subscription operation was already performed: {}", e); - return Err(e.into()); - } - } - } - } - }; - - - let jwks_roles = Arc::new( - spec.as_ref() - .local_proxy_config - .iter() - .flat_map(|it| &it.jwks) - .flatten() - .flat_map(|setting| &setting.role_names) - .cloned() - .collect::>(), - ); - - let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext { - roles, - dbs: databases, - })); - - // Apply special pre drop database phase. - // NOTE: we use the code of RunInEachDatabase phase for parallelism - // and connection management, but we don't really run it in *each* database, - // only in databases, we're about to drop. - info!("Applying PerDatabase (pre-dropdb) phase"); - let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); - - // Run the phase for each database that we're about to drop. - let db_processes = spec - .delta_operations - .iter() - .flatten() - .filter_map(move |op| { - if op.action.as_str() == "delete_db" { - Some(op.name.clone()) - } else { - None - } - }) - .map(|dbname| { - let spec = spec.clone(); - let ctx = ctx.clone(); - let jwks_roles = jwks_roles.clone(); - let mut conf = conf.as_ref().clone(); - let concurrency_token = concurrency_token.clone(); - // We only need dbname field for this phase, so set other fields to dummy values - let db = DB::UserDB(Database { - name: dbname.clone(), - owner: "cloud_admin".to_string(), - options: None, - restrict_conn: false, - invalid: false, - }); - - debug!("Applying per-database phases for Database {:?}", &db); - - match &db { - DB::SystemDB => {} - DB::UserDB(db) => { - conf.dbname(db.name.as_str()); - } - } - - let conf = Arc::new(conf); - let fut = Self::apply_spec_sql_db( - spec.clone(), - conf, - ctx.clone(), - jwks_roles.clone(), - concurrency_token.clone(), - db, - [DropLogicalSubscriptions].to_vec(), - ); - - Ok(spawn(fut)) - }) - .collect::>>(); - - for process in db_processes.into_iter() { - let handle = process?; - if let Err(e) = handle.await? { - // Handle the error case where the database does not exist - // We do not check whether the DB exists or not in the deletion phase, - // so we shouldn't be strict about it in pre-deletion cleanup as well. - if e.to_string().contains("does not exist") { - warn!("Error dropping subscription: {}", e); - } else { - return Err(e); - } - }; - } - - for phase in [ - CreateSuperUser, - DropInvalidDatabases, - RenameRoles, - CreateAndAlterRoles, - RenameAndDeleteDatabases, - CreateAndAlterDatabases, - CreateSchemaNeon, - ] { - info!("Applying phase {:?}", &phase); - apply_operations( - spec.clone(), - ctx.clone(), - jwks_roles.clone(), - phase, - || async { Ok(&client) }, - ) - .await?; - } - - info!("Applying RunInEachDatabase2 phase"); - let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); - - let db_processes = spec - .cluster - .databases - .iter() - .map(|db| DB::new(db.clone())) - // include - .chain(once(DB::SystemDB)) - .map(|db| { - let spec = spec.clone(); - let ctx = ctx.clone(); - let jwks_roles = jwks_roles.clone(); - let mut conf = conf.as_ref().clone(); - let concurrency_token = concurrency_token.clone(); - let db = db.clone(); - - debug!("Applying per-database phases for Database {:?}", &db); - - match &db { - DB::SystemDB => {} - DB::UserDB(db) => { - conf.dbname(db.name.as_str()); - } - } - - let conf = Arc::new(conf); - let mut phases = vec![ - DeleteDBRoleReferences, - ChangeSchemaPerms, - HandleAnonExtension, - ]; - - if spec.drop_subscriptions_before_start && !drop_subscriptions_done { - info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); - phases.push(DropLogicalSubscriptions); - } - - let fut = Self::apply_spec_sql_db( - spec.clone(), - conf, - ctx.clone(), - jwks_roles.clone(), - concurrency_token.clone(), - db, - phases, - ); - - Ok(spawn(fut)) - }) - .collect::>>(); - - for process in db_processes.into_iter() { - let handle = process?; - handle.await??; - } - - let mut phases = vec![ - HandleOtherExtensions, - HandleNeonExtension, // This step depends on CreateSchemaNeon - CreateAvailabilityCheck, - DropRoles, - ]; - - // This step depends on CreateSchemaNeon - if spec.drop_subscriptions_before_start && !drop_subscriptions_done { - info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); - phases.push(FinalizeDropLogicalSubscriptions); - } - - for phase in phases { - debug!("Applying phase {:?}", &phase); - apply_operations( - spec.clone(), - ctx.clone(), - jwks_roles.clone(), - phase, - || async { Ok(&client) }, - ) - .await?; - } - - Ok::<(), anyhow::Error>(()) - })?; - - Ok(()) - } - - /// Apply SQL migrations of the RunInEachDatabase phase. - /// - /// May opt to not connect to databases that don't have any scheduled - /// operations. The function is concurrency-controlled with the provided - /// semaphore. The caller has to make sure the semaphore isn't exhausted. - async fn apply_spec_sql_db( - spec: Arc, - conf: Arc, - ctx: Arc>, - jwks_roles: Arc>, - concurrency_token: Arc, - db: DB, - subphases: Vec, - ) -> Result<()> { - let _permit = concurrency_token.acquire().await?; - - let mut client_conn = None; - - for subphase in subphases { - apply_operations( - spec.clone(), - ctx.clone(), - jwks_roles.clone(), - RunInEachDatabase { - db: db.clone(), - subphase, - }, - // Only connect if apply_operation actually wants a connection. - // It's quite possible this database doesn't need any queries, - // so by not connecting we save time and effort connecting to - // that database. - || async { - if client_conn.is_none() { - let db_client = Self::get_maintenance_client(&conf).await?; - client_conn.replace(db_client); - } - let client = client_conn.as_ref().unwrap(); - Ok(client) - }, - ) - .await?; - } - - drop(client_conn); - - Ok::<(), anyhow::Error>(()) - } - - /// Choose how many concurrent connections to use for applying the spec changes. - pub fn max_service_connections( - &self, - compute_state: &ComputeState, - spec: &ComputeSpec, - ) -> usize { - // If the cluster is in Init state we don't have to deal with user connections, - // and can thus use all `max_connections` connection slots. However, that's generally not - // very efficient, so we generally still limit it to a smaller number. - if compute_state.status == ComputeStatus::Init { - // If the settings contain 'max_connections', use that as template - if let Some(config) = spec.cluster.settings.find("max_connections") { - config.parse::().ok() - } else { - // Otherwise, try to find the setting in the postgresql_conf string - spec.cluster - .postgresql_conf - .iter() - .flat_map(|conf| conf.split("\n")) - .filter_map(|line| { - if !line.contains("max_connections") { - return None; - } - - let (key, value) = line.split_once("=")?; - let key = key - .trim_start_matches(char::is_whitespace) - .trim_end_matches(char::is_whitespace); - - let value = value - .trim_start_matches(char::is_whitespace) - .trim_end_matches(char::is_whitespace); - - if key != "max_connections" { - return None; - } - - value.parse::().ok() - }) - .next() - } - // If max_connections is present, use at most 1/3rd of that. - // When max_connections is lower than 30, try to use at least 10 connections, but - // never more than max_connections. - .map(|limit| match limit { - 0..10 => limit, - 10..30 => 10, - 30.. => limit / 3, - }) - // If we didn't find max_connections, default to 10 concurrent connections. - .unwrap_or(10) - } else { - // state == Running - // Because the cluster is already in the Running state, we should assume users are - // already connected to the cluster, and high concurrency could negatively - // impact user connectivity. Therefore, we can limit concurrency to the number of - // reserved superuser connections, which users wouldn't be able to use anyway. - spec.cluster - .settings - .find("superuser_reserved_connections") - .iter() - .filter_map(|val| val.parse::().ok()) - .map(|val| if val > 1 { val - 1 } else { 1 }) - .last() - .unwrap_or(3) - } - } - /// Do initial configuration of the already started Postgres. #[instrument(skip_all)] pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index b4e084fd91..f9a37c5c98 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -4,15 +4,413 @@ use std::future::Future; use std::iter::{empty, once}; use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; +use compute_api::responses::ComputeStatus; use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role}; use futures::future::join_all; use tokio::sync::RwLock; use tokio_postgres::Client; -use tracing::{Instrument, debug, info_span, warn}; +use tokio_postgres::error::SqlState; +use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; -use crate::compute::construct_superuser_query; -use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal}; +use crate::compute::{ComputeNode, ComputeState, construct_superuser_query}; +use crate::pg_helpers::{ + DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async, + get_existing_roles_async, +}; +use crate::spec_apply::ApplySpecPhase::{ + CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon, + CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions, + HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles, + RunInEachDatabase, +}; +use crate::spec_apply::PerDatabasePhase::{ + ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension, +}; + +impl ComputeNode { + /// Apply the spec to the running PostgreSQL instance. + /// The caller can decide to run with multiple clients in parallel, or + /// single mode. Either way, the commands executed will be the same, and + /// only commands run in different databases are parallelized. + #[instrument(skip_all)] + pub fn apply_spec_sql( + &self, + spec: Arc, + conf: Arc, + concurrency: usize, + ) -> Result<()> { + info!("Applying config with max {} concurrency", concurrency); + debug!("Config: {:?}", spec); + + let rt = tokio::runtime::Handle::current(); + rt.block_on(async { + // Proceed with post-startup configuration. Note, that order of operations is important. + let client = Self::get_maintenance_client(&conf).await?; + let spec = spec.clone(); + + let databases = get_existing_dbs_async(&client).await?; + let roles = get_existing_roles_async(&client) + .await? + .into_iter() + .map(|role| (role.name.clone(), role)) + .collect::>(); + + // Check if we need to drop subscriptions before starting the endpoint. + // + // It is important to do this operation exactly once when endpoint starts on a new branch. + // Otherwise, we may drop not inherited, but newly created subscriptions. + // + // We cannot rely only on spec.drop_subscriptions_before_start flag, + // because if for some reason compute restarts inside VM, + // it will start again with the same spec and flag value. + // + // To handle this, we save the fact of the operation in the database + // in the neon.drop_subscriptions_done table. + // If the table does not exist, we assume that the operation was never performed, so we must do it. + // If table exists, we check if the operation was performed on the current timelilne. + // + let mut drop_subscriptions_done = false; + + if spec.drop_subscriptions_before_start { + let timeline_id = self.get_timeline_id().context("timeline_id must be set")?; + let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id); + + info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id); + + drop_subscriptions_done = match + client.simple_query(&query).await { + Ok(result) => { + matches!(&result[0], postgres::SimpleQueryMessage::Row(_)) + }, + Err(e) => + { + match e.code() { + Some(&SqlState::UNDEFINED_TABLE) => false, + _ => { + // We don't expect any other error here, except for the schema/table not existing + error!("Error checking if drop subscription operation was already performed: {}", e); + return Err(e.into()); + } + } + } + } + }; + + + let jwks_roles = Arc::new( + spec.as_ref() + .local_proxy_config + .iter() + .flat_map(|it| &it.jwks) + .flatten() + .flat_map(|setting| &setting.role_names) + .cloned() + .collect::>(), + ); + + let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext { + roles, + dbs: databases, + })); + + // Apply special pre drop database phase. + // NOTE: we use the code of RunInEachDatabase phase for parallelism + // and connection management, but we don't really run it in *each* database, + // only in databases, we're about to drop. + info!("Applying PerDatabase (pre-dropdb) phase"); + let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + // Run the phase for each database that we're about to drop. + let db_processes = spec + .delta_operations + .iter() + .flatten() + .filter_map(move |op| { + if op.action.as_str() == "delete_db" { + Some(op.name.clone()) + } else { + None + } + }) + .map(|dbname| { + let spec = spec.clone(); + let ctx = ctx.clone(); + let jwks_roles = jwks_roles.clone(); + let mut conf = conf.as_ref().clone(); + let concurrency_token = concurrency_token.clone(); + // We only need dbname field for this phase, so set other fields to dummy values + let db = DB::UserDB(Database { + name: dbname.clone(), + owner: "cloud_admin".to_string(), + options: None, + restrict_conn: false, + invalid: false, + }); + + debug!("Applying per-database phases for Database {:?}", &db); + + match &db { + DB::SystemDB => {} + DB::UserDB(db) => { + conf.dbname(db.name.as_str()); + } + } + + let conf = Arc::new(conf); + let fut = Self::apply_spec_sql_db( + spec.clone(), + conf, + ctx.clone(), + jwks_roles.clone(), + concurrency_token.clone(), + db, + [DropLogicalSubscriptions].to_vec(), + ); + + Ok(tokio::spawn(fut)) + }) + .collect::>>(); + + for process in db_processes.into_iter() { + let handle = process?; + if let Err(e) = handle.await? { + // Handle the error case where the database does not exist + // We do not check whether the DB exists or not in the deletion phase, + // so we shouldn't be strict about it in pre-deletion cleanup as well. + if e.to_string().contains("does not exist") { + warn!("Error dropping subscription: {}", e); + } else { + return Err(e); + } + }; + } + + for phase in [ + CreateSuperUser, + DropInvalidDatabases, + RenameRoles, + CreateAndAlterRoles, + RenameAndDeleteDatabases, + CreateAndAlterDatabases, + CreateSchemaNeon, + ] { + info!("Applying phase {:?}", &phase); + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + phase, + || async { Ok(&client) }, + ) + .await?; + } + + info!("Applying RunInEachDatabase2 phase"); + let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + let db_processes = spec + .cluster + .databases + .iter() + .map(|db| DB::new(db.clone())) + // include + .chain(once(DB::SystemDB)) + .map(|db| { + let spec = spec.clone(); + let ctx = ctx.clone(); + let jwks_roles = jwks_roles.clone(); + let mut conf = conf.as_ref().clone(); + let concurrency_token = concurrency_token.clone(); + let db = db.clone(); + + debug!("Applying per-database phases for Database {:?}", &db); + + match &db { + DB::SystemDB => {} + DB::UserDB(db) => { + conf.dbname(db.name.as_str()); + } + } + + let conf = Arc::new(conf); + let mut phases = vec![ + DeleteDBRoleReferences, + ChangeSchemaPerms, + HandleAnonExtension, + ]; + + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { + info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); + phases.push(DropLogicalSubscriptions); + } + + let fut = Self::apply_spec_sql_db( + spec.clone(), + conf, + ctx.clone(), + jwks_roles.clone(), + concurrency_token.clone(), + db, + phases, + ); + + Ok(tokio::spawn(fut)) + }) + .collect::>>(); + + for process in db_processes.into_iter() { + let handle = process?; + handle.await??; + } + + let mut phases = vec![ + HandleOtherExtensions, + HandleNeonExtension, // This step depends on CreateSchemaNeon + CreateAvailabilityCheck, + DropRoles, + ]; + + // This step depends on CreateSchemaNeon + if spec.drop_subscriptions_before_start && !drop_subscriptions_done { + info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set"); + phases.push(FinalizeDropLogicalSubscriptions); + } + + for phase in phases { + debug!("Applying phase {:?}", &phase); + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + phase, + || async { Ok(&client) }, + ) + .await?; + } + + Ok::<(), anyhow::Error>(()) + })?; + + Ok(()) + } + + /// Apply SQL migrations of the RunInEachDatabase phase. + /// + /// May opt to not connect to databases that don't have any scheduled + /// operations. The function is concurrency-controlled with the provided + /// semaphore. The caller has to make sure the semaphore isn't exhausted. + async fn apply_spec_sql_db( + spec: Arc, + conf: Arc, + ctx: Arc>, + jwks_roles: Arc>, + concurrency_token: Arc, + db: DB, + subphases: Vec, + ) -> Result<()> { + let _permit = concurrency_token.acquire().await?; + + let mut client_conn = None; + + for subphase in subphases { + apply_operations( + spec.clone(), + ctx.clone(), + jwks_roles.clone(), + RunInEachDatabase { + db: db.clone(), + subphase, + }, + // Only connect if apply_operation actually wants a connection. + // It's quite possible this database doesn't need any queries, + // so by not connecting we save time and effort connecting to + // that database. + || async { + if client_conn.is_none() { + let db_client = Self::get_maintenance_client(&conf).await?; + client_conn.replace(db_client); + } + let client = client_conn.as_ref().unwrap(); + Ok(client) + }, + ) + .await?; + } + + drop(client_conn); + + Ok::<(), anyhow::Error>(()) + } + + /// Choose how many concurrent connections to use for applying the spec changes. + pub fn max_service_connections( + &self, + compute_state: &ComputeState, + spec: &ComputeSpec, + ) -> usize { + // If the cluster is in Init state we don't have to deal with user connections, + // and can thus use all `max_connections` connection slots. However, that's generally not + // very efficient, so we generally still limit it to a smaller number. + if compute_state.status == ComputeStatus::Init { + // If the settings contain 'max_connections', use that as template + if let Some(config) = spec.cluster.settings.find("max_connections") { + config.parse::().ok() + } else { + // Otherwise, try to find the setting in the postgresql_conf string + spec.cluster + .postgresql_conf + .iter() + .flat_map(|conf| conf.split("\n")) + .filter_map(|line| { + if !line.contains("max_connections") { + return None; + } + + let (key, value) = line.split_once("=")?; + let key = key + .trim_start_matches(char::is_whitespace) + .trim_end_matches(char::is_whitespace); + + let value = value + .trim_start_matches(char::is_whitespace) + .trim_end_matches(char::is_whitespace); + + if key != "max_connections" { + return None; + } + + value.parse::().ok() + }) + .next() + } + // If max_connections is present, use at most 1/3rd of that. + // When max_connections is lower than 30, try to use at least 10 connections, but + // never more than max_connections. + .map(|limit| match limit { + 0..10 => limit, + 10..30 => 10, + 30.. => limit / 3, + }) + // If we didn't find max_connections, default to 10 concurrent connections. + .unwrap_or(10) + } else { + // state == Running + // Because the cluster is already in the Running state, we should assume users are + // already connected to the cluster, and high concurrency could negatively + // impact user connectivity. Therefore, we can limit concurrency to the number of + // reserved superuser connections, which users wouldn't be able to use anyway. + spec.cluster + .settings + .find("superuser_reserved_connections") + .iter() + .filter_map(|val| val.parse::().ok()) + .map(|val| if val > 1 { val - 1 } else { 1 }) + .last() + .unwrap_or(3) + } + } +} #[derive(Clone)] pub enum DB {