diff --git a/Cargo.lock b/Cargo.lock
index d483ec7088..c5b8fc016a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1812,10 +1812,12 @@ name = "common-base"
version = "0.9.3"
dependencies = [
"anymap",
+ "async-trait",
"bitvec",
"bytes",
"common-error",
"common-macro",
+ "futures",
"paste",
"serde",
"snafu 0.8.4",
@@ -1952,6 +1954,7 @@ dependencies = [
"datatypes",
"geohash",
"h3o",
+ "jsonb",
"num",
"num-traits",
"once_cell",
@@ -2293,6 +2296,7 @@ dependencies = [
"common-telemetry",
"futures-util",
"humantime-serde",
+ "num_cpus",
"rskafka",
"rustls 0.23.10",
"rustls-native-certs",
@@ -3166,6 +3170,7 @@ dependencies = [
"datafusion-common",
"enum_dispatch",
"greptime-proto",
+ "jsonb",
"num",
"num-traits",
"ordered-float 3.9.2",
@@ -3698,6 +3703,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
+[[package]]
+name = "fast-float"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c"
+
[[package]]
name = "fastdivide"
version = "0.4.1"
@@ -4302,7 +4313,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
-source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=157cfdb52709e489cf1f3ce8e3042ed4ee8a524a#157cfdb52709e489cf1f3ce8e3042ed4ee8a524a"
+source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=973f49cde88a582fb65755cc572ebcf6fb93ccf7#973f49cde88a582fb65755cc572ebcf6fb93ccf7"
dependencies = [
"prost 0.12.6",
"serde",
@@ -5409,6 +5420,21 @@ dependencies = [
"serde",
]
+[[package]]
+name = "jsonb"
+version = "0.4.1"
+source = "git+https://github.com/CookiePieWw/jsonb.git?rev=d0166c130fce903bf6c58643417a3173a6172d31#d0166c130fce903bf6c58643417a3173a6172d31"
+dependencies = [
+ "byteorder",
+ "fast-float",
+ "itoa",
+ "nom",
+ "ordered-float 4.2.0",
+ "rand",
+ "ryu",
+ "serde_json",
+]
+
[[package]]
name = "jsonpath-rust"
version = "0.5.1"
@@ -8062,6 +8088,8 @@ dependencies = [
"chrono",
"fallible-iterator",
"postgres-protocol",
+ "serde",
+ "serde_json",
]
[[package]]
@@ -10400,6 +10428,7 @@ dependencies = [
"hyper 0.14.29",
"influxdb_line_protocol",
"itertools 0.10.5",
+ "jsonb",
"lazy_static",
"mime_guess",
"mysql_async",
@@ -10779,6 +10808,7 @@ dependencies = [
"hex",
"iso8601",
"itertools 0.10.5",
+ "jsonb",
"lazy_static",
"regex",
"serde_json",
diff --git a/Cargo.toml b/Cargo.toml
index 93ea8db134..d412bf7e97 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -120,10 +120,11 @@ etcd-client = { version = "0.13" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
-greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "157cfdb52709e489cf1f3ce8e3042ed4ee8a524a" }
+greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "973f49cde88a582fb65755cc572ebcf6fb93ccf7" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
+jsonb = { git = "https://github.com/CookiePieWw/jsonb.git", rev = "d0166c130fce903bf6c58643417a3173a6172d31", default-features = false }
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" }
mockall = "0.11.4"
diff --git a/config/config.md b/config/config.md
index f0ee9e54f8..a792be5de5 100644
--- a/config/config.md
+++ b/config/config.md
@@ -68,6 +68,7 @@
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. |
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. |
+| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. |
@@ -381,6 +382,7 @@
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. |
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. |
+| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 07c1df3e2a..14fbf914e7 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -170,6 +170,9 @@ prefill_log_files = false
## **It's only used when the provider is `raft_engine`**.
sync_period = "10s"
+## Parallelism during WAL recovery.
+recovery_parallelism = 2
+
## The Kafka broker endpoints.
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index f36c0e2904..f7c7b2af29 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -174,6 +174,9 @@ prefill_log_files = false
## **It's only used when the provider is `raft_engine`**.
sync_period = "10s"
+## Parallelism during WAL recovery.
+recovery_parallelism = 2
+
## The Kafka broker endpoints.
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs
index d8e9c524d8..101cae8802 100644
--- a/src/api/src/helper.rs
+++ b/src/api/src/helper.rs
@@ -42,7 +42,8 @@ use greptime_proto::v1::greptime_request::Request;
use greptime_proto::v1::query_request::Query;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{
- ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, QueryRequest, Row, SemanticType,
+ ColumnDataTypeExtension, DdlRequest, DecimalTypeExtension, JsonTypeExtension, QueryRequest,
+ Row, SemanticType,
};
use paste::paste;
use snafu::prelude::*;
@@ -103,7 +104,17 @@ impl From for ConcreteDataType {
ColumnDataType::Uint64 => ConcreteDataType::uint64_datatype(),
ColumnDataType::Float32 => ConcreteDataType::float32_datatype(),
ColumnDataType::Float64 => ConcreteDataType::float64_datatype(),
- ColumnDataType::Binary => ConcreteDataType::binary_datatype(),
+ ColumnDataType::Binary => {
+ if let Some(TypeExt::JsonType(_)) = datatype_wrapper
+ .datatype_ext
+ .as_ref()
+ .and_then(|datatype_ext| datatype_ext.type_ext.as_ref())
+ {
+ ConcreteDataType::json_datatype()
+ } else {
+ ConcreteDataType::binary_datatype()
+ }
+ }
ColumnDataType::String => ConcreteDataType::string_datatype(),
ColumnDataType::Date => ConcreteDataType::date_datatype(),
ColumnDataType::Datetime => ConcreteDataType::datetime_datatype(),
@@ -236,7 +247,7 @@ impl TryFrom for ColumnDataTypeWrapper {
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
- ConcreteDataType::Binary(_) => ColumnDataType::Binary,
+ ConcreteDataType::Binary(_) | ConcreteDataType::Json(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
@@ -276,6 +287,16 @@ impl TryFrom for ColumnDataTypeWrapper {
})),
})
}
+ ColumnDataType::Binary => {
+ if datatype == ConcreteDataType::json_datatype() {
+ // Json is the same as binary in proto. The extension marks the binary in proto is actually a json.
+ Some(ColumnDataTypeExtension {
+ type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
+ })
+ } else {
+ None
+ }
+ }
_ => None,
};
Ok(Self {
@@ -649,7 +670,8 @@ pub fn pb_values_to_vector_ref(data_type: &ConcreteDataType, values: Values) ->
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_)
- | ConcreteDataType::Duration(_) => {
+ | ConcreteDataType::Duration(_)
+ | ConcreteDataType::Json(_) => {
unreachable!()
}
}
@@ -813,7 +835,8 @@ pub fn pb_values_to_values(data_type: &ConcreteDataType, values: Values) -> Vec<
ConcreteDataType::Null(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_)
- | ConcreteDataType::Duration(_) => {
+ | ConcreteDataType::Duration(_)
+ | ConcreteDataType::Json(_) => {
unreachable!()
}
}
@@ -831,7 +854,13 @@ pub fn is_column_type_value_eq(
expect_type: &ConcreteDataType,
) -> bool {
ColumnDataTypeWrapper::try_new(type_value, type_extension)
- .map(|wrapper| ConcreteDataType::from(wrapper) == *expect_type)
+ .map(|wrapper| {
+ let datatype = ConcreteDataType::from(wrapper);
+ (datatype == *expect_type)
+ // Json type leverage binary type in pb, so this is valid.
+ || (datatype == ConcreteDataType::binary_datatype()
+ && *expect_type == ConcreteDataType::json_datatype())
+ })
.unwrap_or(false)
}
diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs
index d39e1abdb9..feb5e31d09 100644
--- a/src/catalog/src/kvbackend/manager.rs
+++ b/src/catalog/src/kvbackend/manager.rs
@@ -36,6 +36,7 @@ use futures_util::{StreamExt, TryStreamExt};
use meta_client::client::MetaClient;
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
+use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use table::dist_table::DistTable;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
@@ -152,7 +153,11 @@ impl CatalogManager for KvBackendCatalogManager {
Ok(keys)
}
- async fn schema_names(&self, catalog: &str) -> Result> {
+ async fn schema_names(
+ &self,
+ catalog: &str,
+ query_ctx: Option<&QueryContext>,
+ ) -> Result> {
let stream = self
.table_metadata_manager
.schema_manager()
@@ -163,12 +168,17 @@ impl CatalogManager for KvBackendCatalogManager {
.map_err(BoxedError::new)
.context(ListSchemasSnafu { catalog })?;
- keys.extend(self.system_catalog.schema_names());
+ keys.extend(self.system_catalog.schema_names(query_ctx));
Ok(keys.into_iter().collect())
}
- async fn table_names(&self, catalog: &str, schema: &str) -> Result> {
+ async fn table_names(
+ &self,
+ catalog: &str,
+ schema: &str,
+ query_ctx: Option<&QueryContext>,
+ ) -> Result> {
let stream = self
.table_metadata_manager
.table_name_manager()
@@ -181,7 +191,7 @@ impl CatalogManager for KvBackendCatalogManager {
.into_iter()
.map(|(k, _)| k)
.collect::>();
- tables.extend_from_slice(&self.system_catalog.table_names(schema));
+ tables.extend_from_slice(&self.system_catalog.table_names(schema, query_ctx));
Ok(tables.into_iter().collect())
}
@@ -194,8 +204,13 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)
}
- async fn schema_exists(&self, catalog: &str, schema: &str) -> Result {
- if self.system_catalog.schema_exists(schema) {
+ async fn schema_exists(
+ &self,
+ catalog: &str,
+ schema: &str,
+ query_ctx: Option<&QueryContext>,
+ ) -> Result {
+ if self.system_catalog.schema_exists(schema, query_ctx) {
return Ok(true);
}
@@ -206,8 +221,14 @@ impl CatalogManager for KvBackendCatalogManager {
.context(TableMetadataManagerSnafu)
}
- async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result {
- if self.system_catalog.table_exists(schema, table) {
+ async fn table_exists(
+ &self,
+ catalog: &str,
+ schema: &str,
+ table: &str,
+ query_ctx: Option<&QueryContext>,
+ ) -> Result {
+ if self.system_catalog.table_exists(schema, table, query_ctx) {
return Ok(true);
}
@@ -225,10 +246,12 @@ impl CatalogManager for KvBackendCatalogManager {
catalog_name: &str,
schema_name: &str,
table_name: &str,
+ query_ctx: Option<&QueryContext>,
) -> Result