add lakebase_v1 cplane impl

This commit is contained in:
Conrad Ludgate
2025-07-17 16:14:56 +01:00
parent 314babb0cb
commit e5f5c79eb1
8 changed files with 117 additions and 4 deletions

View File

@@ -192,7 +192,8 @@ async fn authenticate(
};
let conn_info = compute::ConnectInfo {
host: db_info.host.into(),
host: db_info.host.as_ref().into(),
server_name: db_info.host.into_string(),
port: db_info.port,
ssl_mode,
host_addr: None,

View File

@@ -33,6 +33,7 @@ impl LocalBackend {
conn_info: ConnectInfo {
host_addr: Some(postgres_addr.ip()),
host: postgres_addr.ip().to_string().into(),
server_name: postgres_addr.ip().to_string(),
port: postgres_addr.port(),
ssl_mode: SslMode::Disable,
},

View File

@@ -151,6 +151,7 @@ pub(crate) struct AuthInfo {
pub struct ConnectInfo {
pub host_addr: Option<IpAddr>,
pub host: Host,
pub server_name: String,
pub port: u16,
pub ssl_mode: SslMode,
}
@@ -303,6 +304,7 @@ impl ConnectInfo {
// require for our business.
let port = self.port;
let host = &*self.host;
let server_name = &*self.server_name;
let addrs = match self.host_addr {
Some(addr) => vec![SocketAddr::new(addr, port)],
@@ -312,7 +314,7 @@ impl ConnectInfo {
match connect_once(&*addrs).await {
Ok((sockaddr, stream)) => Ok((
sockaddr,
tls::connect_tls(stream, self.ssl_mode, config, host, tls).await?,
tls::connect_tls(stream, self.ssl_mode, config, server_name, tls).await?,
)),
Err(err) => {
warn!("couldn't connect to compute node at {host}:{port}: {err}");

View File

@@ -31,7 +31,7 @@ use crate::control_plane::{
use crate::metrics::Metrics;
use crate::proxy::retry::CouldRetry;
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId, RoleName};
use crate::types::{EndpointCacheKey, EndpointId, Host, RoleName};
use crate::{compute, http, scram};
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
@@ -313,7 +313,7 @@ impl NeonControlPlaneClient {
Some(_) => SslMode::Require,
None => SslMode::Disable,
};
let host = match body.server_name {
let host: Host = match body.server_name {
Some(host) => host.into(),
None => host.into(),
};
@@ -321,6 +321,7 @@ impl NeonControlPlaneClient {
let node = NodeInfo {
conn_info: compute::ConnectInfo {
host_addr,
server_name: host.to_string(),
host,
port,
ssl_mode,

View File

@@ -0,0 +1,104 @@
//! Production console backend.
use std::sync::Arc;
use ::http::HeaderName;
use postgres_client::config::SslMode;
use crate::auth::backend::ComputeUserInfo;
use crate::auth::backend::jwt::AuthRule;
use crate::compute::ConnectInfo;
use crate::context::RequestContext;
use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::messages::{ColdStartInfo, EndpointRateLimitConfig, MetricsAuxInfo};
use crate::control_plane::{
AccessBlockerFlags, CachedNodeInfo, EndpointAccessControl, NodeInfo, RoleAccessControl,
};
use crate::intern::{BranchIdInt, ProjectIdInt};
use crate::types::{BranchId, EndpointId, ProjectId, RoleName};
pub(crate) const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
#[derive(Clone)]
pub struct LakebaseClient {
pub namespace: String,
pub port: u16,
}
impl LakebaseClient {
/// Construct an API object containing the auth parameters.
pub fn new(namespace: String, port: u16) -> Self {
Self { namespace, port }
}
}
impl super::ControlPlaneApi for LakebaseClient {
#[tracing::instrument(skip_all)]
async fn get_role_access_control(
&self,
_ctx: &RequestContext,
_endpoint: &EndpointId,
_role: &RoleName,
) -> Result<RoleAccessControl, crate::control_plane::errors::GetAuthInfoError> {
Ok(RoleAccessControl { secret: None })
}
#[tracing::instrument(skip_all)]
async fn get_endpoint_access_control(
&self,
_ctx: &RequestContext,
_endpoint: &EndpointId,
_role: &RoleName,
) -> Result<EndpointAccessControl, GetAuthInfoError> {
Ok(EndpointAccessControl {
allowed_ips: Arc::new(vec![]),
allowed_vpce: Arc::new(vec![]),
flags: AccessBlockerFlags::default(),
rate_limits: EndpointRateLimitConfig::default(),
})
}
#[tracing::instrument(skip_all)]
async fn get_endpoint_jwks(
&self,
_ctx: &RequestContext,
_endpoint: &EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
Err(GetEndpointJwksError::ControlPlane(
ControlPlaneError::Transport(std::io::Error::other("unsupported")),
))
}
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
_ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
let instance_id = user_info.endpoint.as_str();
let namespace = self.namespace.as_str();
let host = format!("{instance_id}.{namespace}.svc.cluster.local").into();
let server_name = format!("{instance_id}.online-tables.dev.databricks.com");
let port = self.port;
Ok(CachedNodeInfo::new_uncached(NodeInfo {
conn_info: ConnectInfo {
host_addr: None,
host,
server_name,
port,
ssl_mode: SslMode::Require,
},
aux: MetricsAuxInfo {
endpoint_id: user_info.endpoint.normalize_intern(),
project_id: ProjectIdInt::from(&ProjectId::from("unknown")),
branch_id: BranchIdInt::from(&BranchId::from("unknown")),
compute_id: user_info.endpoint.as_str().into(),
cold_start_info: ColdStartInfo::WarmCached,
},
}))
}
}

View File

@@ -175,12 +175,14 @@ impl MockControlPlane {
let conn_info = match self.endpoint.host_str() {
None => ConnectInfo {
host_addr: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
server_name: "localhost".into(),
host: "localhost".into(),
port,
ssl_mode: SslMode::Disable,
},
Some(host) => ConnectInfo {
host_addr: IpAddr::from_str(host).ok(),
server_name: host.into(),
host: host.into(),
port,
ssl_mode: SslMode::Disable,

View File

@@ -1,4 +1,5 @@
pub mod cplane_proxy_v1;
pub mod lakebase_v1;
#[cfg(any(test, feature = "testing"))]
pub mod mock;

View File

@@ -531,6 +531,7 @@ fn helper_create_uncached_node_info() -> NodeInfo {
NodeInfo {
conn_info: compute::ConnectInfo {
host: "test".into(),
server_name: "test".into(),
port: 5432,
ssl_mode: SslMode::Disable,
host_addr: None,