Compare commits

..

3 Commits

Author SHA1 Message Date
discord9
7e243632c7 fix: dist planner rm col req when rm sort (#7512)
* aha!

Signed-off-by: discord9 <discord9@163.com>

* fix: rm col_req in pql sort

Signed-off-by: discord9 <discord9@163.com>

* ut

Signed-off-by: discord9 <discord9@163.com>

* docs

Signed-off-by: discord9 <discord9@163.com>

* typo

Signed-off-by: discord9 <discord9@163.com>

* more typo

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2026-01-05 03:27:11 +00:00
Ruihang Xia
3556eb4476 chore: add tests to comment column on information_schema (#7514)
* feat: show comment on information_schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add to information schema for columns, add sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove duplications

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update integration test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2026-01-04 09:05:50 +00:00
Weny Xu
9343da7fe8 feat(meta-srv): fallback to non-TLS connection when etcd TLS prefer mode fail (#7507)
* feat(meta-srv): fallback to non-TLS connection when etcd TLS prefer mode fail

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore(ci): set timeout for deploy cluster

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: simplify etcd TLS prefer mode handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-12-31 10:03:34 +00:00
15 changed files with 221 additions and 32 deletions

View File

@@ -70,19 +70,23 @@ runs:
--wait \
--wait-for-jobs
- name: Wait for GreptimeDB
shell: bash
run: |
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
break
else
echo "Cluster is not ready yet: Current phase: $PHASE"
kubectl get pods -n my-greptimedb
sleep 5 # wait for 5 seconds before check again.
fi
done
uses: nick-fields/retry@v3
with:
timeout_minutes: 3
max_attempts: 1
shell: bash
command: |
while true; do
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
if [ "$PHASE" == "Running" ]; then
echo "Cluster is ready"
break
else
echo "Cluster is not ready yet: Current phase: $PHASE"
kubectl get pods -n my-greptimedb
sleep 5 # wait for 5 seconds before check again.
fi
done
- name: Print GreptimeDB info
if: always()
shell: bash

View File

@@ -399,8 +399,8 @@ impl InformationSchemaColumnsBuilder {
self.is_nullables.push(Some("No"));
}
self.column_types.push(Some(&data_type));
self.column_comments
.push(column_schema.column_comment().map(|x| x.as_ref()));
let column_comment = column_schema.column_comment().map(|x| x.as_ref());
self.column_comments.push(column_comment);
}
fn finish(&mut self) -> Result<RecordBatch> {

View File

@@ -92,7 +92,7 @@ impl StoreConfig {
pub fn tls_config(&self) -> Option<TlsOption> {
if self.backend_tls_mode != TlsMode::Disable {
Some(TlsOption {
mode: self.backend_tls_mode.clone(),
mode: self.backend_tls_mode,
cert_path: self.backend_tls_cert_path.clone(),
key_path: self.backend_tls_key_path.clone(),
ca_cert_path: self.backend_tls_ca_cert_path.clone(),

View File

@@ -236,7 +236,7 @@ impl StartCommand {
};
let tls_opts = TlsOption::new(
self.tls_mode.clone(),
self.tls_mode,
self.tls_cert_path.clone(),
self.tls_key_path.clone(),
self.tls_watch,

View File

@@ -261,7 +261,7 @@ impl StartCommand {
};
let tls_opts = TlsOption::new(
self.tls_mode.clone(),
self.tls_mode,
self.tls_cert_path.clone(),
self.tls_key_path.clone(),
self.tls_watch,

View File

@@ -868,6 +868,8 @@ impl PgStore {
let client = match pool.get().await {
Ok(client) => client,
Err(e) => {
// We need to log the debug for the error to help diagnose the issue.
common_telemetry::error!(e; "Failed to get Postgres connection.");
return GetPostgresConnectionSnafu {
reason: e.to_string(),
}

View File

@@ -299,7 +299,7 @@ impl Default for MetasrvOptions {
#[allow(deprecated)]
server_addr: String::new(),
store_addrs: vec!["127.0.0.1:2379".to_string()],
backend_tls: None,
backend_tls: Some(TlsOption::prefer()),
selector: SelectorType::default(),
enable_region_failover: false,
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_meta::kv_backend::etcd::create_etcd_tls_options;
use common_telemetry::warn;
use etcd_client::{Client, ConnectOptions};
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
@@ -38,9 +39,12 @@ pub async fn create_etcd_client_with_tls(
client_options.keep_alive_interval,
client_options.keep_alive_timeout,
);
let all_endpoints_use_https = etcd_endpoints.iter().all(|e| e.starts_with("https"));
if let Some(tls_config) = tls_config
&& let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config))
.context(BuildTlsOptionsSnafu)?
&& let Some(tls_options) =
create_etcd_tls_options(&convert_tls_option(all_endpoints_use_https, tls_config))
.context(BuildTlsOptionsSnafu)?
{
connect_options = connect_options.with_tls(tls_options);
}
@@ -50,9 +54,22 @@ pub async fn create_etcd_client_with_tls(
.context(error::ConnectEtcdSnafu)
}
fn convert_tls_option(tls_option: &TlsOption) -> common_meta::kv_backend::etcd::TlsOption {
fn convert_tls_option(
all_endpoints_use_https: bool,
tls_option: &TlsOption,
) -> common_meta::kv_backend::etcd::TlsOption {
let mode = match tls_option.mode {
TlsMode::Disable => common_meta::kv_backend::etcd::TlsMode::Disable,
TlsMode::Prefer => {
if all_endpoints_use_https {
common_meta::kv_backend::etcd::TlsMode::Require
} else {
warn!(
"All endpoints use HTTP, TLS prefer mode is not supported, using disable mode"
);
common_meta::kv_backend::etcd::TlsMode::Disable
}
}
_ => common_meta::kv_backend::etcd::TlsMode::Require,
};
common_meta::kv_backend::etcd::TlsOption {

View File

@@ -281,18 +281,18 @@ struct PlanRewriter {
/// 2: Sort: t.pk1+t.pk2
/// 3. Projection: t.number, t.pk1, t.pk2
/// ```
/// `Sort` will make a column requirement for `t.pk1` at level 2.
/// `Sort` will make a column requirement for `t.pk1+t.pk2` at level 2.
/// Which making `Projection` at level 1 need to add a ref to `t.pk1` as well.
/// So that the expanded plan will be
/// ```ignore
/// Projection: t.number
/// MergeSort: t.pk1
/// MergeSort: t.pk1+t.pk2
/// MergeScan: remote_input=
/// Projection: t.number, "t.pk1+t.pk2" <--- the original `Projection` at level 1 get added with `t.pk1+t.pk2`
/// Sort: t.pk1+t.pk2
/// Projection: t.number, t.pk1, t.pk2
/// ```
/// Making `MergeSort` can have `t.pk1` as input.
/// Making `MergeSort` can have `t.pk1+t.pk2` as input.
/// Meanwhile `Projection` at level 3 doesn't need to add any new column because 3 > 2
/// and col requirements at level 2 is not applicable for level 3.
///
@@ -392,10 +392,11 @@ impl PlanRewriter {
&& ext_b.node.name() == MergeSortLogicalPlan::name()
{
// revert last `ConditionalCommutative` result for Sort plan in this case.
// `update_column_requirements` left unchanged because Sort won't generate
// new columns or remove existing columns.
// also need to remove any column requirements made by the Sort Plan
// as it may refer to columns later no longer exist(rightfully) like by aggregate or projection
self.stage.pop();
self.expand_on_next_part_cond_trans_commutative = false;
self.column_requirements.clear();
}
}
Commutativity::PartialCommutative => {
@@ -680,6 +681,10 @@ struct EnforceDistRequirementRewriter {
impl EnforceDistRequirementRewriter {
fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
debug!(
"Create EnforceDistRequirementRewriter with column_requirements: {:?} at cur_level: {}",
column_requirements, cur_level
);
Self {
column_requirements,
cur_level,
@@ -733,7 +738,7 @@ impl EnforceDistRequirementRewriter {
.filter(|a| !a.is_empty())
else {
return Err(datafusion_common::DataFusionError::Internal(format!(
"EnforceDistRequirementRewriter: no alias found for required column {original_col} in child plan {child} from original plan {original}",
"EnforceDistRequirementRewriter: no alias found for required column {original_col} at level {level} in current node's child plan: \n{child} from original plan: \n{original}",
)));
};

View File

@@ -777,6 +777,67 @@ fn expand_step_aggr_proj() {
assert_eq!(expected, result.to_string());
}
/// Make sure that `SeriesDivide` special handling correctly clean up column requirements from it's previous sort
#[test]
fn expand_complex_col_req_sort_pql() {
// use logging for better debugging
init_default_ut_logging();
let test_table = TestTable::table_with_name(0, "t".to_string());
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
DfTableProviderAdapter::new(test_table),
)));
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![])
.unwrap()
.sort(vec![
col("pk1").sort(true, false),
col("pk2").sort(true, false),
col("pk3").sort(true, false), // make some col req here
])
.unwrap()
.build()
.unwrap();
let plan = SeriesDivide::new(
vec!["pk1".to_string(), "pk2".to_string(), "pk3".to_string()],
"ts".to_string(),
plan,
);
let plan = LogicalPlan::Extension(datafusion_expr::Extension {
node: Arc::new(plan),
});
let plan = LogicalPlanBuilder::from(plan)
.aggregate(vec![col("pk1"), col("pk2")], vec![min(col("number"))])
.unwrap()
.sort(vec![
col("pk1").sort(true, false),
col("pk2").sort(true, false),
])
.unwrap()
.project(vec![col("pk1"), col("pk2")])
.unwrap()
.build()
.unwrap();
let config = ConfigOptions::default();
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = [
"Projection: t.pk1, t.pk2",
" MergeSort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
" MergeScan [is_placeholder=false, remote_input=[",
"Projection: t.pk1, t.pk2",
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
" Aggregate: groupBy=[[t.pk1, t.pk2]], aggr=[[min(t.number)]]",
r#" PromSeriesDivide: tags=["pk1", "pk2", "pk3"]"#,
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST, t.pk3 ASC NULLS LAST",
" TableScan: t",
"]]",
]
.join("\n");
assert_eq!(expected, result.to_string());
}
/// should only expand `Sort`, notice `Sort` before `Aggregate` usually can and
/// will be optimized out, and dist planner shouldn't handle that case, but
/// for now, still handle that be expanding the `Sort` node

View File

@@ -144,7 +144,7 @@ impl Categorizer {
}
}
// all group by expressions are partition columns can push down, unless
// another push down(including `Limit` or `Sort`) is already in progress(which will then prvent next cond commutative node from being push down).
// another push down(including `Limit` or `Sort`) is already in progress(which will then prevent next cond commutative node from being push down).
// TODO(discord9): This is a temporary solution(that works), a better description of
// commutativity is needed under this situation.
Commutativity::ConditionalCommutative(None)

View File

@@ -234,7 +234,7 @@ impl QueryEngineState {
rules.retain(|rule| rule.name() != name);
}
/// Optimize the logical plan by the extension anayzer rules.
/// Optimize the logical plan by the extension analyzer rules.
pub fn optimize_by_extension_rules(
&self,
plan: DfLogicalPlan,

View File

@@ -29,7 +29,7 @@ use strum::EnumString;
use crate::error::{InternalIoSnafu, Result};
/// TlsMode is used for Mysql and Postgres server start up.
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, EnumString)]
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, EnumString)]
#[serde(rename_all = "snake_case")]
pub enum TlsMode {
#[default]
@@ -91,6 +91,17 @@ impl TlsOption {
tls_option
}
/// Creates a new TLS option with the prefer mode.
pub fn prefer() -> Self {
Self {
mode: TlsMode::Prefer,
cert_path: String::new(),
key_path: String::new(),
ca_cert_path: String::new(),
watch: false,
}
}
/// Validates the TLS configuration.
///
/// Returns an error if:

View File

@@ -32,6 +32,17 @@ SHOW CREATE TABLE comment_table_test;
| | ) |
+--------------------+---------------------------------------------------+
SELECT table_comment
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'comment_table_test';
+-------------------------+
| table_comment |
+-------------------------+
| table level description |
+-------------------------+
-- Remove table comment
COMMENT ON TABLE comment_table_test IS NULL;
@@ -54,6 +65,17 @@ SHOW CREATE TABLE comment_table_test;
| | |
+--------------------+---------------------------------------------------+
SELECT table_comment
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'comment_table_test';
+---------------+
| table_comment |
+---------------+
| |
+---------------+
DROP TABLE comment_table_test;
Affected Rows: 0
@@ -90,6 +112,18 @@ SHOW CREATE TABLE comment_column_test;
| | |
+---------------------+---------------------------------------------------------+
SELECT column_comment
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'comment_column_test'
AND column_name = 'val';
+--------------------------+
| column_comment |
+--------------------------+
| value column description |
+--------------------------+
-- Remove column comment
COMMENT ON COLUMN comment_column_test.val IS NULL;
@@ -112,6 +146,18 @@ SHOW CREATE TABLE comment_column_test;
| | |
+---------------------+----------------------------------------------------+
SELECT column_comment
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'comment_column_test'
AND column_name = 'val';
+----------------+
| column_comment |
+----------------+
| |
+----------------+
DROP TABLE comment_column_test;
Affected Rows: 0
@@ -155,6 +201,16 @@ SHOW CREATE FLOW flow_comment_test;
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
+-------------------+------------------------------------------------------+
SELECT comment
FROM information_schema.flows
WHERE flow_name = 'flow_comment_test';
+------------------------+
| comment |
+------------------------+
| flow level description |
+------------------------+
-- Remove flow comment
COMMENT ON FLOW flow_comment_test IS NULL;
@@ -170,6 +226,16 @@ SHOW CREATE FLOW flow_comment_test;
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
+-------------------+------------------------------------------------------+
SELECT comment
FROM information_schema.flows
WHERE flow_name = 'flow_comment_test';
+---------+
| comment |
+---------+
| |
+---------+
DROP FLOW flow_comment_test;
Affected Rows: 0

View File

@@ -9,10 +9,18 @@ CREATE TABLE comment_table_test (
-- Add table comment
COMMENT ON TABLE comment_table_test IS 'table level description';
SHOW CREATE TABLE comment_table_test;
SELECT table_comment
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'comment_table_test';
-- Remove table comment
COMMENT ON TABLE comment_table_test IS NULL;
SHOW CREATE TABLE comment_table_test;
SELECT table_comment
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'comment_table_test';
DROP TABLE comment_table_test;
@@ -27,10 +35,20 @@ CREATE TABLE comment_column_test (
-- Add column comment
COMMENT ON COLUMN comment_column_test.val IS 'value column description';
SHOW CREATE TABLE comment_column_test;
SELECT column_comment
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'comment_column_test'
AND column_name = 'val';
-- Remove column comment
COMMENT ON COLUMN comment_column_test.val IS NULL;
SHOW CREATE TABLE comment_column_test;
SELECT column_comment
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'comment_column_test'
AND column_name = 'val';
DROP TABLE comment_column_test;
@@ -54,12 +72,17 @@ SELECT desc_str, ts FROM flow_source_comment_test;
-- Add flow comment
COMMENT ON FLOW flow_comment_test IS 'flow level description';
SHOW CREATE FLOW flow_comment_test;
SELECT comment
FROM information_schema.flows
WHERE flow_name = 'flow_comment_test';
-- Remove flow comment
COMMENT ON FLOW flow_comment_test IS NULL;
SHOW CREATE FLOW flow_comment_test;
SELECT comment
FROM information_schema.flows
WHERE flow_name = 'flow_comment_test';
DROP FLOW flow_comment_test;
DROP TABLE flow_source_comment_test;
DROP TABLE flow_sink_comment_test;