feat: upgraded pg_catalog support (#6918)

* refactor: add datafusion-postgres dependency

* refactor: move and include pg_catalog udfs

* chore: update upstream

* feat: register table function pg_get_keywords

* feat: bridge CatalogInfo for our CatalogManager

Signed-off-by: Ning Sun <sunning@greptime.com>

* feat: convert pg_catalog table to our system table

* feat: bridge system catalog with datafusion-postgres

Signed-off-by: Ning Sun <sunning@greptime.com>

* feat: add more udfs

* feat: add compatibility rewriter to postgres handler

* fix: various fix

* fmt: fix

* fix: use functions from pg_catalog library

* fmt

* fix: sqlness runner

Signed-off-by: Ning Sun <sunning@greptime.com>

* test: adopt arrow 56.0 to 56.1 memory size change

* fix: add additional udfs

* chore: format

* refactor: return None when creating system table failed

Signed-off-by: Ning Sun <sunning@greptime.com>

* chore: provide safety comments about expect usage

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
Ning Sun
2025-09-25 12:05:34 +08:00
committed by GitHub
parent 91a727790d
commit 964dc254aa
31 changed files with 1294 additions and 1412 deletions

View File

@@ -35,6 +35,7 @@ common-version.workspace = true
common-workload.workspace = true
dashmap.workspace = true
datafusion.workspace = true
datafusion-postgres.workspace = true
datatypes.workspace = true
futures.workspace = true
futures-util.workspace = true
@@ -48,7 +49,6 @@ paste.workspace = true
prometheus.workspace = true
promql-parser.workspace = true
rand.workspace = true
rustc-hash.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true

View File

@@ -12,53 +12,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod pg_catalog_memory_table;
mod pg_class;
mod pg_database;
mod pg_namespace;
mod table_names;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock, Weak};
use std::sync::{Arc, Weak};
use common_catalog::consts::{self, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, PG_CATALOG_NAME};
use datatypes::schema::ColumnSchema;
use lazy_static::lazy_static;
use paste::paste;
use pg_catalog_memory_table::get_schema_columns;
use pg_class::PGClass;
use pg_database::PGDatabase;
use pg_namespace::PGNamespace;
use session::context::{Channel, QueryContext};
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, PG_CATALOG_NAME, PG_CATALOG_TABLE_ID_START};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_telemetry::warn;
use datafusion::datasource::TableType;
use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion_postgres::pg_catalog::catalog_info::CatalogInfo;
use datafusion_postgres::pg_catalog::{
PgCatalogSchemaProvider, PgCatalogStaticTables, PgCatalogTable,
};
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::TableRef;
pub use table_names::*;
use table::metadata::TableId;
use self::pg_namespace::oid_map::{PGNamespaceOidMap, PGNamespaceOidMapRef};
use crate::CatalogManager;
use crate::system_schema::memory_table::MemoryTable;
use crate::system_schema::utils::tables::u32_column;
use crate::system_schema::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef};
lazy_static! {
static ref MEMORY_TABLES: &'static [&'static str] = &[table_names::PG_TYPE];
}
/// The column name for the OID column.
/// The OID column is a unique identifier of type u32 for each object in the database.
const OID_COLUMN_NAME: &str = "oid";
fn oid_column() -> ColumnSchema {
u32_column(OID_COLUMN_NAME)
}
use crate::error::{InternalSnafu, ProjectSchemaSnafu, Result};
use crate::system_schema::{
SystemSchemaProvider, SystemSchemaProviderInner, SystemTable, SystemTableRef,
};
/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog.
pub struct PGCatalogProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
inner: PgCatalogSchemaProvider<CatalogManagerWrapper>,
tables: HashMap<String, TableRef>,
// Workaround to store mapping of schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
table_ids: HashMap<&'static str, u32>,
}
impl SystemSchemaProvider for PGCatalogProvider {
@@ -69,30 +57,33 @@ impl SystemSchemaProvider for PGCatalogProvider {
}
}
// TODO(j0hn50n133): Not sure whether to avoid duplication with `information_schema` or not.
macro_rules! setup_memory_table {
($name: expr) => {
paste! {
{
let (schema, columns) = get_schema_columns($name);
Some(Arc::new(MemoryTable::new(
consts::[<PG_CATALOG_ $name _TABLE_ID>],
$name,
schema,
columns
)) as _)
}
}
};
}
impl PGCatalogProvider {
pub fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
// safe to expect/unwrap because it contains only schema read, this can
// be ensured by sqlness tests
let static_tables =
PgCatalogStaticTables::try_new().expect("Failed to initialize static tables");
let inner = PgCatalogSchemaProvider::try_new(
CatalogManagerWrapper {
catalog_name: catalog_name.clone(),
catalog_manager,
},
Arc::new(static_tables),
)
.expect("Failed to initialize PgCatalogSchemaProvider");
let mut table_ids = HashMap::new();
let mut table_id = PG_CATALOG_TABLE_ID_START;
for name in datafusion_postgres::pg_catalog::PG_CATALOG_TABLES {
table_ids.insert(*name, table_id);
table_id += 1;
}
let mut provider = Self {
catalog_name,
catalog_manager,
inner,
tables: HashMap::new(),
namespace_oid_map: Arc::new(PGNamespaceOidMap::new()),
table_ids,
};
provider.build_tables();
provider
@@ -102,23 +93,13 @@ impl PGCatalogProvider {
// SECURITY NOTE:
// Must follow the same security rules as [`InformationSchemaProvider::build_tables`].
let mut tables = HashMap::new();
// TODO(J0HN50N133): modeling the table_name as a enum type to get rid of expect/unwrap here
// It's safe to unwrap here because we are sure that the constants have been handle correctly inside system_table.
for name in MEMORY_TABLES.iter() {
tables.insert(name.to_string(), self.build_table(name).expect(name));
for name in datafusion_postgres::pg_catalog::PG_CATALOG_TABLES {
if let Some(table) = self.build_table(name) {
tables.insert(name.to_string(), table);
}
}
tables.insert(
PG_NAMESPACE.to_string(),
self.build_table(PG_NAMESPACE).expect(PG_NAMESPACE),
);
tables.insert(
PG_CLASS.to_string(),
self.build_table(PG_CLASS).expect(PG_NAMESPACE),
);
tables.insert(
PG_DATABASE.to_string(),
self.build_table(PG_DATABASE).expect(PG_DATABASE),
);
self.tables = tables;
}
}
@@ -129,24 +110,26 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
}
fn system_table(&self, name: &str) -> Option<SystemTableRef> {
match name {
table_names::PG_TYPE => setup_memory_table!(PG_TYPE),
table_names::PG_NAMESPACE => Some(Arc::new(PGNamespace::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
table_names::PG_CLASS => Some(Arc::new(PGClass::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
table_names::PG_DATABASE => Some(Arc::new(PGDatabase::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
_ => None,
if let Some((table_name, table_id)) = self.table_ids.get_key_value(name) {
let table = self.inner.build_table_by_name(name).expect(name);
if let Some(table) = table {
if let Ok(system_table) = DFTableProviderAsSystemTable::try_new(
*table_id,
table_name,
table::metadata::TableType::Temporary,
table,
) {
Some(Arc::new(system_table))
} else {
warn!("failed to create pg_catalog system table {}", name);
None
}
} else {
None
}
} else {
None
}
}
@@ -155,11 +138,177 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
}
}
/// Provide query context to call the [`CatalogManager`]'s method.
static PG_QUERY_CTX: LazyLock<QueryContext> = LazyLock::new(|| {
QueryContext::with_channel(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Channel::Postgres)
});
fn query_ctx() -> Option<&'static QueryContext> {
Some(&PG_QUERY_CTX)
#[derive(Clone)]
pub struct CatalogManagerWrapper {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
}
impl CatalogManagerWrapper {
fn catalog_manager(&self) -> std::result::Result<Arc<dyn CatalogManager>, DataFusionError> {
self.catalog_manager.upgrade().ok_or_else(|| {
DataFusionError::Internal("Failed to access catalog manager".to_string())
})
}
}
impl std::fmt::Debug for CatalogManagerWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CatalogManagerWrapper").finish()
}
}
#[async_trait]
impl CatalogInfo for CatalogManagerWrapper {
async fn catalog_names(&self) -> std::result::Result<Vec<String>, DataFusionError> {
if self.catalog_name == DEFAULT_CATALOG_NAME {
CatalogManager::catalog_names(self.catalog_manager()?.as_ref())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))
} else {
Ok(vec![self.catalog_name.to_string()])
}
}
async fn schema_names(
&self,
catalog_name: &str,
) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
self.catalog_manager()?
.schema_names(catalog_name, None)
.await
.map(Some)
.map_err(|e| DataFusionError::External(Box::new(e)))
}
async fn table_names(
&self,
catalog_name: &str,
schema_name: &str,
) -> std::result::Result<Option<Vec<String>>, DataFusionError> {
self.catalog_manager()?
.table_names(catalog_name, schema_name, None)
.await
.map(Some)
.map_err(|e| DataFusionError::External(Box::new(e)))
}
async fn table_schema(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> std::result::Result<Option<SchemaRef>, DataFusionError> {
let table = self
.catalog_manager()?
.table(catalog_name, schema_name, table_name, None)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(table.map(|t| t.schema().arrow_schema().clone()))
}
async fn table_type(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> std::result::Result<Option<TableType>, DataFusionError> {
let table = self
.catalog_manager()?
.table(catalog_name, schema_name, table_name, None)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(table.map(|t| t.table_type().into()))
}
}
struct DFTableProviderAsSystemTable {
pub table_id: TableId,
pub table_name: &'static str,
pub table_type: table::metadata::TableType,
pub schema: Arc<datatypes::schema::Schema>,
pub table_provider: PgCatalogTable,
}
impl DFTableProviderAsSystemTable {
pub fn try_new(
table_id: TableId,
table_name: &'static str,
table_type: table::metadata::TableType,
table_provider: PgCatalogTable,
) -> Result<Self> {
let arrow_schema = table_provider.schema();
let schema = Arc::new(arrow_schema.try_into().context(ProjectSchemaSnafu)?);
Ok(Self {
table_id,
table_name,
table_type,
schema,
table_provider,
})
}
}
impl SystemTable for DFTableProviderAsSystemTable {
fn table_id(&self) -> TableId {
self.table_id
}
fn table_name(&self) -> &'static str {
self.table_name
}
fn schema(&self) -> Arc<datatypes::schema::Schema> {
self.schema.clone()
}
fn table_type(&self) -> table::metadata::TableType {
self.table_type
}
fn to_stream(&self, _request: ScanRequest) -> Result<SendableRecordBatchStream> {
match &self.table_provider {
PgCatalogTable::Static(table) => {
let schema = self.schema.arrow_schema().clone();
let data = table
.data()
.iter()
.map(|rb| Ok(rb.clone()))
.collect::<Vec<_>>();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::iter(data),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
PgCatalogTable::Dynamic(table) => {
let stream = table.execute(Arc::new(TaskContext::default()));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
PgCatalogTable::Empty(_) => {
let schema = self.schema.arrow_schema().clone();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::iter(vec![]),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
}
}

View File

@@ -1,69 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{Int16Vector, StringVector, UInt32Vector, VectorRef};
use crate::memory_table_cols;
use crate::system_schema::pg_catalog::oid_column;
use crate::system_schema::pg_catalog::table_names::PG_TYPE;
use crate::system_schema::utils::tables::{i16_column, string_column};
fn pg_type_schema_columns() -> (Vec<ColumnSchema>, Vec<VectorRef>) {
// TODO(j0hn50n133): acquire this information from `DataType` instead of hardcoding it to avoid regression.
memory_table_cols!(
[oid, typname, typlen],
[
(1, "String", -1),
(2, "Binary", -1),
(3, "Int8", 1),
(4, "Int16", 2),
(5, "Int32", 4),
(6, "Int64", 8),
(7, "UInt8", 1),
(8, "UInt16", 2),
(9, "UInt32", 4),
(10, "UInt64", 8),
(11, "Float32", 4),
(12, "Float64", 8),
(13, "Decimal", 16),
(14, "Date", 4),
(15, "DateTime", 8),
(16, "Timestamp", 8),
(17, "Time", 8),
(18, "Duration", 8),
(19, "Interval", 16),
(20, "List", -1),
]
);
(
// not quiet identical with pg, we only follow the definition in pg
vec![oid_column(), string_column("typname"), i16_column("typlen")],
vec![
Arc::new(UInt32Vector::from_vec(oid)), // oid
Arc::new(StringVector::from(typname)),
Arc::new(Int16Vector::from_vec(typlen)), // typlen in bytes
],
)
}
pub(super) fn get_schema_columns(table_name: &str) -> (SchemaRef, Vec<VectorRef>) {
let (column_schemas, columns): (_, Vec<VectorRef>) = match table_name {
PG_TYPE => pg_type_schema_columns(),
_ => unreachable!("Unknown table in pg_catalog: {}", table_name),
};
(Arc::new(Schema::new(column_schemas)), columns)
}

View File

@@ -1,276 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::PG_CATALOG_PG_CLASS_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use futures::TryStreamExt;
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::metadata::TableType;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
use crate::system_schema::pg_catalog::{OID_COLUMN_NAME, PG_CLASS, query_ctx};
use crate::system_schema::utils::tables::{string_column, u32_column};
// === column name ===
pub const RELNAME: &str = "relname";
pub const RELNAMESPACE: &str = "relnamespace";
pub const RELKIND: &str = "relkind";
pub const RELOWNER: &str = "relowner";
// === enum value of relkind ===
pub const RELKIND_TABLE: &str = "r";
pub const RELKIND_VIEW: &str = "v";
/// The initial capacity of the vector builders.
const INIT_CAPACITY: usize = 42;
/// The dummy owner id for the namespace.
const DUMMY_OWNER_ID: u32 = 0;
/// The `pg_catalog.pg_class` table implementation.
pub(super) struct PGClass {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
// Workaround to convert schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}
impl PGClass {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
namespace_oid_map,
}
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(RELNAME),
u32_column(RELNAMESPACE),
string_column(RELKIND),
u32_column(RELOWNER),
]))
}
fn builder(&self) -> PGClassBuilder {
PGClassBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
)
}
}
impl fmt::Debug for PGClass {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PGClass")
.field("schema", &self.schema)
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl SystemTable for PGClass {
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_CLASS_TABLE_ID
}
fn table_name(&self) -> &'static str {
PG_CLASS
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(
&self,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_class(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
impl DfPartitionStream for PGClass {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_class(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
/// Builds the `pg_catalog.pg_class` table row by row
/// TODO(J0HN50N133): `relowner` is always the [`DUMMY_OWNER_ID`] because we don't have users.
/// Once we have user system, make it the actual owner of the table.
struct PGClassBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
oid: UInt32VectorBuilder,
relname: StringVectorBuilder,
relnamespace: UInt32VectorBuilder,
relkind: StringVectorBuilder,
relowner: UInt32VectorBuilder,
}
impl PGClassBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,
oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relnamespace: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relkind: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relowner: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
}
}
async fn make_class(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager
.schema_names(&catalog_name, query_ctx())
.await?
{
let mut stream = catalog_manager.tables(&catalog_name, &schema_name, query_ctx());
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
self.add_class(
&predicates,
table_info.table_id(),
&schema_name,
&table_info.name,
if table_info.table_type == TableType::View {
RELKIND_VIEW
} else {
RELKIND_TABLE
},
);
}
}
self.finish()
}
fn add_class(
&mut self,
predicates: &Predicates,
oid: u32,
schema: &str,
table: &str,
kind: &str,
) {
let namespace_oid = self.namespace_oid_map.get_oid(schema);
let row = [
(OID_COLUMN_NAME, &Value::from(oid)),
(RELNAMESPACE, &Value::from(schema)),
(RELNAME, &Value::from(table)),
(RELKIND, &Value::from(kind)),
(RELOWNER, &Value::from(DUMMY_OWNER_ID)),
];
if !predicates.eval(&row) {
return;
}
self.oid.push(Some(oid));
self.relnamespace.push(Some(namespace_oid));
self.relname.push(Some(table));
self.relkind.push(Some(kind));
self.relowner.push(Some(DUMMY_OWNER_ID));
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> = vec![
Arc::new(self.oid.finish()),
Arc::new(self.relname.finish()),
Arc::new(self.relnamespace.finish()),
Arc::new(self.relkind.finish()),
Arc::new(self.relowner.finish()),
];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

View File

@@ -1,223 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::PG_CATALOG_PG_DATABASE_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::pg_namespace::oid_map::PGNamespaceOidMapRef;
use crate::system_schema::pg_catalog::{OID_COLUMN_NAME, PG_DATABASE, query_ctx};
use crate::system_schema::utils::tables::{string_column, u32_column};
// === column name ===
pub const DATNAME: &str = "datname";
/// The initial capacity of the vector builders.
const INIT_CAPACITY: usize = 42;
/// The `pg_catalog.database` table implementation.
pub(super) struct PGDatabase {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
// Workaround to convert schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}
impl std::fmt::Debug for PGDatabase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PGDatabase")
.field("schema", &self.schema)
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl PGDatabase {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
namespace_oid_map,
}
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(DATNAME),
]))
}
fn builder(&self) -> PGCDatabaseBuilder {
PGCDatabaseBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
)
}
}
impl DfPartitionStream for PGDatabase {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_database(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
impl SystemTable for PGDatabase {
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_DATABASE_TABLE_ID
}
fn table_name(&self) -> &'static str {
PG_DATABASE
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn to_stream(
&self,
request: ScanRequest,
) -> Result<common_recordbatch::SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_database(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
/// Builds the `pg_catalog.pg_database` table row by row
/// `oid` use schema name as a workaround since we don't have numeric schema id.
/// `nspname` is the schema name.
struct PGCDatabaseBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
oid: UInt32VectorBuilder,
datname: StringVectorBuilder,
}
impl PGCDatabaseBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,
oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
datname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
async fn make_database(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager
.schema_names(&catalog_name, query_ctx())
.await?
{
self.add_database(&predicates, &schema_name);
}
self.finish()
}
fn add_database(&mut self, predicates: &Predicates, schema_name: &str) {
let oid = self.namespace_oid_map.get_oid(schema_name);
let row: [(&str, &Value); 2] = [
(OID_COLUMN_NAME, &Value::from(oid)),
(DATNAME, &Value::from(schema_name)),
];
if !predicates.eval(&row) {
return;
}
self.oid.push(Some(oid));
self.datname.push(Some(schema_name));
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> =
vec![Arc::new(self.oid.finish()), Arc::new(self.datname.finish())];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
}

View File

@@ -1,221 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! The `pg_catalog.pg_namespace` table implementation.
//! namespace is a schema in greptime
pub(super) mod oid_map;
use std::fmt;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
use common_catalog::consts::PG_CATALOG_PG_NAMESPACE_TABLE_ID;
use common_error::ext::BoxedError;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{DfSendableRecordBatchStream, RecordBatch, SendableRecordBatchStream};
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use crate::CatalogManager;
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::SystemTable;
use crate::system_schema::pg_catalog::{
OID_COLUMN_NAME, PG_NAMESPACE, PGNamespaceOidMapRef, query_ctx,
};
use crate::system_schema::utils::tables::{string_column, u32_column};
const NSPNAME: &str = "nspname";
const INIT_CAPACITY: usize = 42;
pub(super) struct PGNamespace {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
// Workaround to convert schema_name to a numeric id
oid_map: PGNamespaceOidMapRef,
}
impl PGNamespace {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
oid_map,
}
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
// TODO(J0HN50N133): we do not have a numeric schema id, use schema name as a workaround. Use a proper schema id once we have it.
u32_column(OID_COLUMN_NAME),
string_column(NSPNAME),
]))
}
fn builder(&self) -> PGNamespaceBuilder {
PGNamespaceBuilder::new(
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.oid_map.clone(),
)
}
}
impl fmt::Debug for PGNamespace {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PGNamespace")
.field("schema", &self.schema)
.field("catalog_name", &self.catalog_name)
.finish()
}
}
impl SystemTable for PGNamespace {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_id(&self) -> table::metadata::TableId {
PG_CATALOG_PG_NAMESPACE_TABLE_ID
}
fn table_name(&self) -> &'static str {
PG_NAMESPACE
}
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_namespace(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
));
Ok(Box::pin(
RecordBatchStreamAdapter::try_new(stream)
.map_err(BoxedError::new)
.context(InternalSnafu)?,
))
}
}
impl DfPartitionStream for PGNamespace {
fn schema(&self) -> &ArrowSchemaRef {
self.schema.arrow_schema()
}
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
futures::stream::once(async move {
builder
.make_namespace(None)
.await
.map(|x| x.into_df_record_batch())
.map_err(Into::into)
}),
))
}
}
/// Builds the `pg_catalog.pg_namespace` table row by row
/// `oid` use schema name as a workaround since we don't have numeric schema id.
/// `nspname` is the schema name.
struct PGNamespaceBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
oid: UInt32VectorBuilder,
nspname: StringVectorBuilder,
}
impl PGNamespaceBuilder {
fn new(
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,
oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
nspname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
/// Construct the `pg_catalog.pg_namespace` virtual table
async fn make_namespace(&mut self, request: Option<ScanRequest>) -> Result<RecordBatch> {
let catalog_name = self.catalog_name.clone();
let catalog_manager = self
.catalog_manager
.upgrade()
.context(UpgradeWeakCatalogManagerRefSnafu)?;
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager
.schema_names(&catalog_name, query_ctx())
.await?
{
self.add_namespace(&predicates, &schema_name);
}
self.finish()
}
fn finish(&mut self) -> Result<RecordBatch> {
let columns: Vec<VectorRef> =
vec![Arc::new(self.oid.finish()), Arc::new(self.nspname.finish())];
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
}
fn add_namespace(&mut self, predicates: &Predicates, schema_name: &str) {
let oid = self.namespace_oid_map.get_oid(schema_name);
let row = [
(OID_COLUMN_NAME, &Value::from(oid)),
(NSPNAME, &Value::from(schema_name)),
];
if !predicates.eval(&row) {
return;
}
self.oid.push(Some(oid));
self.nspname.push(Some(schema_name));
}
}

View File

@@ -1,94 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::BuildHasher;
use std::sync::Arc;
use dashmap::DashMap;
use rustc_hash::FxSeededState;
pub type PGNamespaceOidMapRef = Arc<PGNamespaceOidMap>;
// Workaround to convert schema_name to a numeric id,
// remove this when we have numeric schema id in greptime
pub struct PGNamespaceOidMap {
oid_map: DashMap<String, u32>,
// Rust use SipHasher by default, which provides resistance against DOS attacks.
// This will produce different hash value between each greptime instance. This will
// cause the sqlness test fail. We need a deterministic hash here to provide
// same oid for the same schema name with best effort and DOS attacks aren't concern here.
hasher: FxSeededState,
}
impl PGNamespaceOidMap {
pub fn new() -> Self {
Self {
oid_map: DashMap::new(),
hasher: FxSeededState::with_seed(0), // PLEASE DO NOT MODIFY THIS SEED VALUE!!!
}
}
fn oid_is_used(&self, oid: u32) -> bool {
self.oid_map.iter().any(|e| *e.value() == oid)
}
pub fn get_oid(&self, schema_name: &str) -> u32 {
if let Some(oid) = self.oid_map.get(schema_name) {
*oid
} else {
let mut oid = self.hasher.hash_one(schema_name) as u32;
while self.oid_is_used(oid) {
oid = self.hasher.hash_one(oid) as u32;
}
self.oid_map.insert(schema_name.to_string(), oid);
oid
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn oid_is_stable() {
let oid_map_1 = PGNamespaceOidMap::new();
let oid_map_2 = PGNamespaceOidMap::new();
let schema = "schema";
let oid = oid_map_1.get_oid(schema);
// oid keep stable in the same instance
assert_eq!(oid, oid_map_1.get_oid(schema));
// oid keep stable between different instances
assert_eq!(oid, oid_map_2.get_oid(schema));
}
#[test]
fn oid_collision() {
let oid_map = PGNamespaceOidMap::new();
let key1 = "3178510";
let key2 = "4215648";
// insert them into oid_map
let oid1 = oid_map.get_oid(key1);
let oid2 = oid_map.get_oid(key2);
// they should have different id
assert_ne!(oid1, oid2);
}
}

View File

@@ -1,22 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// https://www.postgresql.org/docs/current/catalog-pg-database.html
pub const PG_DATABASE: &str = "pg_database";
// https://www.postgresql.org/docs/current/catalog-pg-namespace.html
pub const PG_NAMESPACE: &str = "pg_namespace";
// https://www.postgresql.org/docs/current/catalog-pg-class.html
pub const PG_CLASS: &str = "pg_class";
// https://www.postgresql.org/docs/current/catalog-pg-type.html
pub const PG_TYPE: &str = "pg_type";

View File

@@ -27,22 +27,6 @@ pub fn string_column(name: &str) -> ColumnSchema {
)
}
pub fn u32_column(name: &str) -> ColumnSchema {
ColumnSchema::new(
str::to_lowercase(name),
ConcreteDataType::uint32_datatype(),
false,
)
}
pub fn i16_column(name: &str) -> ColumnSchema {
ColumnSchema::new(
str::to_lowercase(name),
ConcreteDataType::int16_datatype(),
false,
)
}
pub fn bigint_column(name: &str) -> ColumnSchema {
ColumnSchema::new(
str::to_lowercase(name),

View File

@@ -112,11 +112,8 @@ pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38;
// ----- End of information_schema tables -----
/// ----- Begin of pg_catalog tables -----
pub const PG_CATALOG_PG_CLASS_TABLE_ID: u32 = 256;
pub const PG_CATALOG_PG_TYPE_TABLE_ID: u32 = 257;
pub const PG_CATALOG_PG_NAMESPACE_TABLE_ID: u32 = 258;
pub const PG_CATALOG_PG_DATABASE_TABLE_ID: u32 = 259;
pub const PG_CATALOG_TABLE_ID_START: u32 = 256;
// Please leave at 128 table ids for Postgres
// ----- End of pg_catalog tables -----
pub const MITO_ENGINE: &str = "mito";

View File

@@ -37,6 +37,7 @@ datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-functions-aggregate-common.workspace = true
datafusion-physical-expr.workspace = true
datafusion-postgres.workspace = true
datatypes.workspace = true
derive_more = { version = "1", default-features = false, features = ["display"] }
geo = { version = "0.29", optional = true }

View File

@@ -21,8 +21,7 @@ mod version;
use build::BuildFunction;
use database::{
ConnectionIdFunction, CurrentSchemaFunction, DatabaseFunction, PgBackendPidFunction,
ReadPreferenceFunction, SessionUserFunction,
ConnectionIdFunction, DatabaseFunction, PgBackendPidFunction, ReadPreferenceFunction,
};
use pg_catalog::PGCatalogFunction;
use procedure_state::ProcedureStateFunction;
@@ -37,9 +36,7 @@ impl SystemFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(BuildFunction);
registry.register_scalar(VersionFunction);
registry.register_scalar(CurrentSchemaFunction);
registry.register_scalar(DatabaseFunction);
registry.register_scalar(SessionUserFunction);
registry.register_scalar(ReadPreferenceFunction);
registry.register_scalar(PgBackendPidFunction);
registry.register_scalar(ConnectionIdFunction);

View File

@@ -26,10 +26,6 @@ use crate::function::{Function, find_function_context};
#[derive(Clone, Debug, Default)]
pub struct DatabaseFunction;
#[derive(Clone, Debug, Default)]
pub struct CurrentSchemaFunction;
pub struct SessionUserFunction;
pub struct ReadPreferenceFunction;
#[derive(Display)]
@@ -41,8 +37,6 @@ pub struct PgBackendPidFunction;
pub struct ConnectionIdFunction;
const DATABASE_FUNCTION_NAME: &str = "database";
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
const READ_PREFERENCE_FUNCTION_NAME: &str = "read_preference";
const PG_BACKEND_PID: &str = "pg_backend_pid";
const CONNECTION_ID: &str = "connection_id";
@@ -71,58 +65,6 @@ impl Function for DatabaseFunction {
}
}
// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
impl Function for CurrentSchemaFunction {
fn name(&self) -> &str {
CURRENT_SCHEMA_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let func_ctx = find_function_context(&args)?;
let db = func_ctx.query_ctx.current_schema();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
}
}
impl Function for SessionUserFunction {
fn name(&self) -> &str {
SESSION_USER_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let func_ctx = find_function_context(&args)?;
let user = func_ctx.query_ctx.current_user();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
user.username().to_string(),
))))
}
}
impl Function for ReadPreferenceFunction {
fn name(&self) -> &str {
READ_PREFERENCE_FUNCTION_NAME
@@ -203,18 +145,6 @@ impl fmt::Display for DatabaseFunction {
}
}
impl fmt::Display for CurrentSchemaFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "CURRENT_SCHEMA")
}
}
impl fmt::Display for SessionUserFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SESSION_USER")
}
}
impl fmt::Display for ReadPreferenceFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "READ_PREFERENCE")

View File

@@ -12,29 +12,168 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod pg_get_userbyid;
mod table_is_visible;
mod version;
use pg_get_userbyid::PGGetUserByIdFunction;
use table_is_visible::PGTableIsVisibleFunction;
use std::sync::Arc;
use common_query::error::Result;
use datafusion::arrow::array::{ArrayRef, StringArray, as_boolean_array};
use datafusion::catalog::TableFunction;
use datafusion::common::ScalarValue;
use datafusion::common::utils::SingleRowListArrayBuilder;
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, Signature, Volatility};
use datafusion_postgres::pg_catalog::{self, PgCatalogStaticTables};
use datatypes::arrow::datatypes::{DataType, Field};
use derive_more::Display;
use version::PGVersionFunction;
use crate::function::{Function, find_function_context};
use crate::function_registry::FunctionRegistry;
#[macro_export]
macro_rules! pg_catalog_func_fullname {
($name:literal) => {
concat!("pg_catalog.", $name)
};
const CURRENT_SCHEMA_FUNCTION_NAME: &str = "current_schema";
const CURRENT_SCHEMAS_FUNCTION_NAME: &str = "current_schemas";
const SESSION_USER_FUNCTION_NAME: &str = "session_user";
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct CurrentSchemaFunction;
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct CurrentSchemasFunction;
#[derive(Clone, Debug, Default, Display)]
#[display("{}", self.name())]
pub struct SessionUserFunction;
// Though "current_schema" can be aliased to "database", to not cause any breaking changes,
// we are not doing it: not until https://github.com/apache/datafusion/issues/17469 is resolved.
impl Function for CurrentSchemaFunction {
fn name(&self) -> &str {
CURRENT_SCHEMA_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let func_ctx = find_function_context(&args)?;
let db = func_ctx.query_ctx.current_schema();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(db))))
}
}
impl Function for SessionUserFunction {
fn name(&self) -> &str {
SESSION_USER_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8View)
}
fn signature(&self) -> Signature {
Signature::nullary(Volatility::Immutable)
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let func_ctx = find_function_context(&args)?;
let user = func_ctx.query_ctx.current_user();
Ok(ColumnarValue::Scalar(ScalarValue::Utf8View(Some(
user.username().to_string(),
))))
}
}
impl Function for CurrentSchemasFunction {
fn name(&self) -> &str {
CURRENT_SCHEMAS_FUNCTION_NAME
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {
Ok(DataType::List(Arc::new(Field::new(
"x",
DataType::Utf8View,
false,
))))
}
fn signature(&self) -> Signature {
Signature::exact(vec![DataType::Boolean], Volatility::Immutable)
}
fn invoke_with_args(
&self,
args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(&args.args)?;
let input = as_boolean_array(&args[0]);
// Create a UTF8 array with a single value
let mut values = vec!["public"];
// include implicit schemas
if input.value(0) {
values.push("information_schema");
values.push("pg_catalog");
values.push("greptime_private");
}
let list_array = SingleRowListArrayBuilder::new(Arc::new(StringArray::from(values)));
let array: ArrayRef = Arc::new(list_array.build_list_array());
Ok(ColumnarValue::Array(array))
}
}
pub(super) struct PGCatalogFunction;
impl PGCatalogFunction {
pub fn register(registry: &FunctionRegistry) {
registry.register_scalar(PGTableIsVisibleFunction);
registry.register_scalar(PGGetUserByIdFunction);
let static_tables =
Arc::new(PgCatalogStaticTables::try_new().expect("load postgres static tables"));
registry.register_scalar(PGVersionFunction);
registry.register_scalar(CurrentSchemaFunction);
registry.register_scalar(CurrentSchemasFunction);
registry.register_scalar(SessionUserFunction);
registry.register(pg_catalog::format_type::create_format_type_udf());
registry.register(pg_catalog::create_pg_get_partkeydef_udf());
registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
"has_table_privilege",
));
registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
"has_schema_privilege",
));
registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
"has_database_privilege",
));
registry.register(pg_catalog::has_privilege_udf::create_has_privilege_udf(
"has_any_column_privilege",
));
registry.register_table_function(TableFunction::new(
"pg_get_keywords".to_string(),
static_tables.pg_get_keywords.clone(),
));
registry.register(pg_catalog::create_pg_relation_is_publishable_udf());
registry.register(pg_catalog::create_pg_get_statisticsobjdef_columns_udf());
registry.register(pg_catalog::create_pg_get_userbyid_udf());
registry.register(pg_catalog::create_pg_table_is_visible());
registry.register(pg_catalog::pg_get_expr_udf::create_pg_get_expr_udf());
// TODO(sunng87): upgrade datafusion to add
//registry.register(pg_catalog::create_pg_encoding_to_char_udf());
}
}

View File

@@ -1,73 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self};
use std::sync::Arc;
use common_query::error::Result;
use datafusion::arrow::datatypes::DataType as ArrowDataType;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{DataType, VectorRef};
use datatypes::types::LogicalPrimitiveType;
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use crate::function::{Function, FunctionContext};
use crate::scalars::expression::{EvalContext, scalar_unary_op};
#[derive(Clone, Debug, Default)]
pub struct PGGetUserByIdFunction;
const NAME: &str = crate::pg_catalog_func_fullname!("pg_get_userbyid");
impl fmt::Display for PGGetUserByIdFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, crate::pg_catalog_func_fullname!("PG_GET_USERBYID"))
}
}
impl Function for PGGetUserByIdFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _: &[ArrowDataType]) -> Result<ArrowDataType> {
Ok(ArrowDataType::Utf8)
}
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![arrow::datatypes::DataType::UInt32],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$T| {
let col = scalar_unary_op::<<$T as LogicalPrimitiveType>::Native, String, _>(&columns[0], pg_get_user_by_id, &mut EvalContext::default())?;
Ok(Arc::new(col))
}, {
unreachable!()
})
}
}
fn pg_get_user_by_id<I>(table_oid: Option<I>, _ctx: &mut EvalContext) -> Option<String>
where
I: AsPrimitive<u32>,
{
// TODO(J0HN50N133): We lack way to get the user_info by a numeric value. Once we have it, we can implement this function.
table_oid.map(|_| "".to_string())
}

View File

@@ -1,73 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::{self};
use std::sync::Arc;
use common_query::error::Result;
use datafusion::arrow::datatypes::DataType as ArrowDataType;
use datafusion_expr::{Signature, Volatility};
use datatypes::prelude::{DataType, VectorRef};
use datatypes::types::LogicalPrimitiveType;
use datatypes::with_match_primitive_type_id;
use num_traits::AsPrimitive;
use crate::function::{Function, FunctionContext};
use crate::scalars::expression::{EvalContext, scalar_unary_op};
#[derive(Clone, Debug, Default)]
pub struct PGTableIsVisibleFunction;
const NAME: &str = crate::pg_catalog_func_fullname!("pg_table_is_visible");
impl fmt::Display for PGTableIsVisibleFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, crate::pg_catalog_func_fullname!("PG_TABLE_IS_VISIBLE"))
}
}
impl Function for PGTableIsVisibleFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _: &[ArrowDataType]) -> Result<ArrowDataType> {
Ok(ArrowDataType::Boolean)
}
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![arrow::datatypes::DataType::UInt32],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: &FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
with_match_primitive_type_id!(columns[0].data_type().logical_type_id(), |$T| {
let col = scalar_unary_op::<<$T as LogicalPrimitiveType>::Native, bool, _>(&columns[0], pg_table_is_visible, &mut EvalContext::default())?;
Ok(Arc::new(col))
}, {
unreachable!()
})
}
}
fn pg_table_is_visible<I>(table_oid: Option<I>, _ctx: &mut EvalContext) -> Option<bool>
where
I: AsPrimitive<u32>,
{
// There is no table visibility in greptime, so we always return true
table_oid.map(|_| true)
}

View File

@@ -27,13 +27,13 @@ pub(crate) struct PGVersionFunction;
impl fmt::Display for PGVersionFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, crate::pg_catalog_func_fullname!("VERSION"))
write!(f, "pg_catalog.VERSION")
}
}
impl Function for PGVersionFunction {
fn name(&self) -> &str {
crate::pg_catalog_func_fullname!("version")
"pg_catalog.version"
}
fn return_type(&self, _: &[DataType]) -> Result<DataType> {

View File

@@ -335,6 +335,14 @@ impl Value {
}
}
/// Cast value to Boolean. Return None if value is not a boolean type.
pub fn as_bool(&self) -> Option<bool> {
match self {
Value::Boolean(b) => Some(*b),
_ => None,
}
}
/// Returns the logical type of the value.
pub fn logical_type_id(&self) -> LogicalTypeId {
match self {
@@ -2781,21 +2789,21 @@ mod tests {
vector: &vector,
idx: 0,
}),
85,
74,
);
check_value_ref_size_eq(
&ValueRef::List(ListValueRef::Indexed {
vector: &vector,
idx: 1,
}),
85,
74,
);
check_value_ref_size_eq(
&ValueRef::List(ListValueRef::Indexed {
vector: &vector,
idx: 2,
}),
85,
74,
);
check_value_ref_size_eq(&ValueRef::Decimal128(Decimal128::new(1234, 3, 1)), 32)
}

View File

@@ -553,7 +553,7 @@ pub mod tests {
assert!(validity.is_set(0));
assert!(!validity.is_set(1));
assert!(validity.is_set(2));
assert_eq!(256, list_vector.memory_size());
assert_eq!(224, list_vector.memory_size());
let slice = list_vector.slice(0, 2).to_arrow_array();
let sliced_array = slice.as_any().downcast_ref::<ListArray>().unwrap();

View File

@@ -314,7 +314,7 @@ mod tests {
assert!(!v.is_const());
assert!(v.validity().is_all_valid());
assert!(!v.only_null());
assert_eq!(1088, v.memory_size());
assert_eq!(1040, v.memory_size());
for (i, s) in strs.iter().enumerate() {
assert_eq!(Value::from(*s), v.get(i));

View File

@@ -450,7 +450,7 @@ mod tests {
let key_bytes = num_keys as usize * 5;
assert_eq!(key_bytes * 2, metrics.key_bytes);
assert_eq!(key_bytes, builder.key_bytes_in_index);
assert_eq!(8850, builder.memory_size());
assert_eq!(8730, builder.memory_size());
let (dict, _) = builder.finish().unwrap();
assert_eq!(0, builder.key_bytes_in_index);

View File

@@ -56,6 +56,7 @@ dashmap.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-postgres.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
futures.workspace = true

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
@@ -115,17 +114,6 @@ pub(crate) fn process<'a>(query: &str, query_ctx: QueryContextRef) -> Option<Vec
}
}
pub(crate) fn rewrite_sql(query: &str) -> Cow<'_, str> {
// DBeaver tricky replacement for datafusion not support sql
// TODO: add more here
query
.replace(
"SELECT db.oid,db.* FROM pg_catalog.pg_database db",
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
)
.into()
}
#[cfg(test)]
mod test {
use session::context::{QueryContext, QueryContextRef};
@@ -211,12 +199,4 @@ mod test {
assert!(process("SHOW TABLES ", query_context.clone()).is_none());
assert!(process("SET TIME_ZONE=utc ", query_context.clone()).is_none());
}
#[test]
fn test_rewrite() {
assert_eq!(
"SELECT db.oid as _oid,db.* FROM pg_catalog.pg_database db",
rewrite_sql("SELECT db.oid,db.* FROM pg_catalog.pg_database db")
);
}
}

View File

@@ -21,6 +21,7 @@ use common_recordbatch::RecordBatch;
use common_recordbatch::error::Result as RecordBatchResult;
use common_telemetry::{debug, tracing};
use datafusion_common::ParamValues;
use datafusion_postgres::sql::PostgresCompatibilityParser;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::SchemaRef;
use futures::{Sink, SinkExt, Stream, StreamExt, future, stream};
@@ -67,14 +68,21 @@ impl SimpleQueryHandler for PostgresServerHandlerInner {
return Ok(vec![Response::EmptyQuery]);
}
let query = fixtures::rewrite_sql(query);
let query = query.as_ref();
let query = if let Ok(statements) = self.query_parser.compatibility_parser.parse(query) {
statements
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(";")
} else {
query.to_string()
};
if let Some(resps) = fixtures::process(query, query_ctx.clone()) {
if let Some(resps) = fixtures::process(&query, query_ctx.clone()) {
send_warning_opt(client, query_ctx).await?;
Ok(resps)
} else {
let outputs = self.query_handler.do_query(query, query_ctx.clone()).await;
let outputs = self.query_handler.do_query(&query, query_ctx.clone()).await;
let mut results = Vec::with_capacity(outputs.len());
@@ -181,6 +189,7 @@ where
pub struct DefaultQueryParser {
query_handler: ServerSqlQueryHandlerRef,
session: Arc<Session>,
compatibility_parser: PostgresCompatibilityParser,
}
impl DefaultQueryParser {
@@ -188,6 +197,7 @@ impl DefaultQueryParser {
DefaultQueryParser {
query_handler,
session,
compatibility_parser: PostgresCompatibilityParser::new(),
}
}
}
@@ -215,12 +225,20 @@ impl QueryParser for DefaultQueryParser {
});
}
let sql = fixtures::rewrite_sql(sql);
let sql = sql.as_ref();
let sql = if let Ok(mut statements) = self.compatibility_parser.parse(sql) {
statements.remove(0).to_string()
} else {
// bypass the error: it can run into error because of different
// versions of sqlparser
sql.to_string()
};
let mut stmts =
ParserContext::create_with_dialect(sql, &PostgreSqlDialect {}, ParseOptions::default())
.map_err(convert_err)?;
let mut stmts = ParserContext::create_with_dialect(
&sql,
&PostgreSqlDialect {},
ParseOptions::default(),
)
.map_err(convert_err)?;
if stmts.len() != 1 {
Err(PgWireError::UserError(Box::new(ErrorInfo::from(
PgErrorCode::Ec42P14,
@@ -245,7 +263,7 @@ impl QueryParser for DefaultQueryParser {
};
Ok(SqlPlan {
query: sql.to_owned(),
query: sql.clone(),
statement: Some(stmt),
plan,
schema,

View File

@@ -68,7 +68,9 @@ impl ParserContext<'_> {
.parser
.parse_literal_uint()
.context(error::SyntaxSnafu)?;
let _ = self.parser.parse_keyword(Keyword::FROM);
let _ = self
.parser
.parse_one_of_keywords(&[Keyword::FROM, Keyword::IN]);
let cursor_name = self
.parser

View File

@@ -130,7 +130,14 @@ impl ParserContext<'_> {
} else if self.consume_token("PROCESSLIST") {
self.parse_show_processlist(false)
} else {
self.unsupported(self.peek_token_as_string())
// follow postgres dialect and assume the next token is the variable
let variable = self
.parse_object_name()
.with_context(|_| error::UnexpectedSnafu {
expected: "a variable name",
actual: self.peek_token_as_string(),
})?;
Ok(Statement::ShowVariables(ShowVariables { variable }))
}
}

View File

@@ -101,6 +101,16 @@ impl std::fmt::Display for TableType {
}
}
impl From<TableType> for datafusion::datasource::TableType {
fn from(t: TableType) -> datafusion::datasource::TableType {
match t {
TableType::Base => datafusion::datasource::TableType::Base,
TableType::View => datafusion::datasource::TableType::View,
TableType::Temporary => datafusion::datasource::TableType::Temporary,
}
}
}
/// Identifier of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableIdent {