fix: change the type of oid in pg_namespace to u32 (#4541)

* fix:  change the type of oid in pg_namespace to u32

* fix: header and correct logic of update oid
This commit is contained in:
JohnsonLee
2024-08-10 23:06:14 +08:00
committed by GitHub
parent 9532ffb954
commit 6694d2a930
10 changed files with 177 additions and 24 deletions

17
Cargo.lock generated
View File

@@ -983,7 +983,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.66",
]
@@ -1349,6 +1349,7 @@ dependencies = [
"partition",
"paste",
"prometheus",
"rustc-hash 2.0.0",
"serde_json",
"session",
"snafu 0.8.4",
@@ -4590,7 +4591,7 @@ dependencies = [
"pusherator",
"ref-cast",
"regex",
"rustc-hash",
"rustc-hash 1.1.0",
"sealed",
"serde",
"serde_json",
@@ -9339,6 +9340,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
[[package]]
name = "rustc_version"
version = "0.4.0"
@@ -9599,7 +9606,7 @@ dependencies = [
"num-traits",
"phf",
"phf_codegen",
"rustc-hash",
"rustc-hash 1.1.0",
"rustpython-ast",
"rustpython-compiler-core",
"tiny-keccak",
@@ -11357,7 +11364,7 @@ dependencies = [
"rayon",
"regex",
"rust-stemmers",
"rustc-hash",
"rustc-hash 1.1.0",
"serde",
"serde_json",
"sketches-ddsketch",
@@ -12427,7 +12434,7 @@ dependencies = [
"log",
"regex",
"regex-syntax 0.6.29",
"rustc-hash",
"rustc-hash 1.1.0",
"semver",
"serde",
"serde_json",

View File

@@ -155,6 +155,7 @@ rskafka = "0.5"
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
rustc-hash = "2.0"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["float_roundtrip"] }

View File

@@ -40,6 +40,7 @@ moka = { workspace = true, features = ["future", "sync"] }
partition.workspace = true
paste = "1.0"
prometheus.workspace = true
rustc-hash.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true

View File

@@ -30,6 +30,7 @@ use pg_namespace::PGNamespace;
use table::TableRef;
pub use table_names::*;
use self::pg_namespace::oid_map::{PGNamespaceOidMap, PGNamespaceOidMapRef};
use super::memory_table::MemoryTable;
use super::utils::tables::u32_column;
use super::{SystemSchemaProvider, SystemSchemaProviderInner, SystemTableRef};
@@ -52,6 +53,9 @@ pub struct PGCatalogProvider {
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
tables: HashMap<String, TableRef>,
// Workaround to store mapping of schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}
impl SystemSchemaProvider for PGCatalogProvider {
@@ -85,6 +89,7 @@ impl PGCatalogProvider {
catalog_name,
catalog_manager,
tables: HashMap::new(),
namespace_oid_map: Arc::new(PGNamespaceOidMap::new()),
};
provider.build_tables();
provider
@@ -122,10 +127,12 @@ impl SystemSchemaProviderInner for PGCatalogProvider {
table_names::PG_NAMESPACE => Some(Arc::new(PGNamespace::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
table_names::PG_CLASS => Some(Arc::new(PGClass::new(
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
))),
_ => None,
}

View File

@@ -31,6 +31,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use table::metadata::TableType;
use super::pg_namespace::oid_map::PGNamespaceOidMapRef;
use super::{OID_COLUMN_NAME, PG_CLASS};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
@@ -60,14 +61,22 @@ pub(super) struct PGClass {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
// Workaround to convert schema_name to a numeric id
namespace_oid_map: PGNamespaceOidMapRef,
}
impl PGClass {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
namespace_oid_map,
}
}
@@ -75,7 +84,7 @@ impl PGClass {
Arc::new(Schema::new(vec![
u32_column(OID_COLUMN_NAME),
string_column(RELNAME),
string_column(RELNAMESPACE),
u32_column(RELNAMESPACE),
string_column(RELKIND),
u32_column(RELOWNER),
]))
@@ -86,6 +95,7 @@ impl PGClass {
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.namespace_oid_map.clone(),
)
}
}
@@ -155,10 +165,11 @@ struct PGClassBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
oid: UInt32VectorBuilder,
relname: StringVectorBuilder,
relnamespace: StringVectorBuilder,
relnamespace: UInt32VectorBuilder,
relkind: StringVectorBuilder,
relowner: UInt32VectorBuilder,
}
@@ -168,15 +179,17 @@ impl PGClassBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
namespace_oid_map,
oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relnamespace: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relnamespace: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
relkind: StringVectorBuilder::with_capacity(INIT_CAPACITY),
relowner: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
}
@@ -217,6 +230,7 @@ impl PGClassBuilder {
table: &str,
kind: &str,
) {
let namespace_oid = self.namespace_oid_map.get_oid(schema);
let row = [
(OID_COLUMN_NAME, &Value::from(oid)),
(RELNAMESPACE, &Value::from(schema)),
@@ -230,7 +244,7 @@ impl PGClassBuilder {
}
self.oid.push(Some(oid));
self.relnamespace.push(Some(schema));
self.relnamespace.push(Some(namespace_oid));
self.relname.push(Some(table));
self.relkind.push(Some(kind));
self.relowner.push(Some(DUMMY_OWNER_ID));

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(super) mod oid_map;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -25,16 +27,16 @@ use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use datatypes::vectors::{StringVectorBuilder, UInt32VectorBuilder, VectorRef};
use snafu::{OptionExt, ResultExt};
use store_api::storage::ScanRequest;
use super::{OID_COLUMN_NAME, PG_NAMESPACE};
use super::{PGNamespaceOidMapRef, OID_COLUMN_NAME, PG_NAMESPACE};
use crate::error::{
CreateRecordBatchSnafu, InternalSnafu, Result, UpgradeWeakCatalogManagerRefSnafu,
};
use crate::information_schema::Predicates;
use crate::system_schema::utils::tables::string_column;
use crate::system_schema::utils::tables::{string_column, u32_column};
use crate::system_schema::SystemTable;
use crate::CatalogManager;
@@ -48,21 +50,29 @@ pub(super) struct PGNamespace {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
// Workaround to convert schema_name to a numeric id
oid_map: PGNamespaceOidMapRef,
}
impl PGNamespace {
pub(super) fn new(catalog_name: String, catalog_manager: Weak<dyn CatalogManager>) -> Self {
pub(super) fn new(
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema: Self::schema(),
catalog_name,
catalog_manager,
oid_map,
}
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![
// TODO(J0HN50N133): we do not have a numeric schema id, use schema name as a workaround. Use a proper schema id once we have it.
string_column(OID_COLUMN_NAME),
u32_column(OID_COLUMN_NAME),
string_column(NSPNAME),
]))
}
@@ -72,6 +82,7 @@ impl PGNamespace {
self.schema.clone(),
self.catalog_name.clone(),
self.catalog_manager.clone(),
self.oid_map.clone(),
)
}
}
@@ -138,8 +149,9 @@ struct PGNamespaceBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
oid: StringVectorBuilder,
oid: UInt32VectorBuilder,
nspname: StringVectorBuilder,
}
@@ -148,12 +160,14 @@ impl PGNamespaceBuilder {
schema: SchemaRef,
catalog_name: String,
catalog_manager: Weak<dyn CatalogManager>,
namespace_oid_map: PGNamespaceOidMapRef,
) -> Self {
Self {
schema,
catalog_name,
catalog_manager,
oid: StringVectorBuilder::with_capacity(INIT_CAPACITY),
namespace_oid_map,
oid: UInt32VectorBuilder::with_capacity(INIT_CAPACITY),
nspname: StringVectorBuilder::with_capacity(INIT_CAPACITY),
}
}
@@ -178,14 +192,15 @@ impl PGNamespaceBuilder {
}
fn add_namespace(&mut self, predicates: &Predicates, schema_name: &str) {
let oid = self.namespace_oid_map.get_oid(schema_name);
let row = [
(OID_COLUMN_NAME, &Value::from(schema_name)),
(OID_COLUMN_NAME, &Value::from(oid)),
(NSPNAME, &Value::from(schema_name)),
];
if !predicates.eval(&row) {
return;
}
self.oid.push(Some(schema_name));
self.oid.push(Some(oid));
self.nspname.push(Some(schema_name));
}
}

View File

@@ -0,0 +1,100 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::BuildHasher;
use std::sync::Arc;
use dashmap::DashMap;
use rustc_hash::FxSeededState;
pub type PGNamespaceOidMapRef = Arc<PGNamespaceOidMap>;
// Workaround to convert schema_name to a numeric id,
// remove this when we have numeric schema id in greptime
pub struct PGNamespaceOidMap {
oid_map: DashMap<String, u32>,
// Rust use SipHasher by default, which provides resistance against DOS attacks.
// This will produce different hash value between each greptime instance. This will
// cause the sqlness test fail. We need a deterministic hash here to provide
// same oid for the same schema name with best effort and DOS attacks aren't concern here.
hasher: FxSeededState,
}
impl PGNamespaceOidMap {
pub fn new() -> Self {
Self {
oid_map: DashMap::new(),
hasher: FxSeededState::with_seed(0), // PLEASE DO NOT MODIFY THIS SEED VALUE!!!
}
}
fn oid_is_used(&self, oid: u32) -> bool {
self.oid_map.iter().any(|e| *e.value() == oid)
}
pub fn get_oid(&self, schema_name: &str) -> u32 {
if let Some(oid) = self.oid_map.get(schema_name) {
*oid
} else {
let mut oid = self.hasher.hash_one(schema_name) as u32;
while self.oid_is_used(oid) {
oid = self.hasher.hash_one(oid) as u32;
}
self.oid_map.insert(schema_name.to_string(), oid);
oid
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn oid_is_stable() {
let oid_map_1 = PGNamespaceOidMap::new();
let oid_map_2 = PGNamespaceOidMap::new();
let schema = "schema";
let oid = oid_map_1.get_oid(schema);
// oid keep stable in the same instance
assert_eq!(oid, oid_map_1.get_oid(schema));
// oid keep stable between different instances
assert_eq!(oid, oid_map_2.get_oid(schema));
}
#[test]
fn oid_collision() {
let oid_map = PGNamespaceOidMap::new();
let key1 = "3178510";
let key2 = "4215648";
// have collision
assert_eq!(
oid_map.hasher.hash_one(key1) as u32,
oid_map.hasher.hash_one(key2) as u32
);
// insert them into oid_map
let oid1 = oid_map.get_oid(key1);
let oid2 = oid_map.get_oid(key2);
// they should have different id
assert_ne!(oid1, oid2);
}
}

View File

@@ -416,10 +416,10 @@ select * from information_schema.columns order by table_schema, table_name, colu
| greptime | pg_catalog | pg_class | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | pg_catalog | pg_class | relkind | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | pg_catalog | pg_class | relname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | pg_catalog | pg_class | relnamespace | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | pg_catalog | pg_class | relnamespace | 3 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | pg_catalog | pg_class | relowner | 5 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | pg_catalog | pg_namespace | nspname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | pg_catalog | pg_namespace | oid | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
| greptime | pg_catalog | pg_namespace | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | pg_catalog | pg_type | oid | 1 | | | 10 | 0 | | | | | | select,insert | | UInt32 | int unsigned | FIELD | | No | int unsigned | | |
| greptime | pg_catalog | pg_type | typlen | 3 | | | 5 | 0 | | | | | | select,insert | | Int16 | smallint | FIELD | | No | smallint | | |
| greptime | pg_catalog | pg_type | typname | 2 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |

View File

@@ -70,6 +70,11 @@ ORDER BY 1,2;
| public | numbers | table | |
+--------+---------+-------+-------+
-- make sure oid of namespace keep stable
SELECT * FROM pg_namespace ORDER BY oid;
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.pg_namespace
create
database my_db;
@@ -167,7 +172,7 @@ order by relnamespace, relname;
+--------------+---------+---------+
| relnamespace | relname | relkind |
+--------------+---------+---------+
| my_db | foo | r |
| 434869349 | foo | r |
+--------------+---------+---------+
use public;
@@ -190,7 +195,7 @@ desc table pg_class;
+--------------+--------+-----+------+---------+---------------+
| oid | UInt32 | | NO | | FIELD |
| relname | String | | NO | | FIELD |
| relnamespace | String | | NO | | FIELD |
| relnamespace | UInt32 | | NO | | FIELD |
| relkind | String | | NO | | FIELD |
| relowner | UInt32 | | NO | | FIELD |
+--------------+--------+-----+------+---------+---------------+
@@ -200,7 +205,7 @@ desc table pg_namespace;
+---------+--------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+--------+-----+------+---------+---------------+
| oid | String | | NO | | FIELD |
| oid | UInt32 | | NO | | FIELD |
| nspname | String | | NO | | FIELD |
+---------+--------+-----+------+---------+---------------+

View File

@@ -32,6 +32,9 @@ WHERE c.relkind IN ('r','p','')
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;
-- make sure oid of namespace keep stable
SELECT * FROM pg_namespace ORDER BY oid;
create
database my_db;