mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 04:42:56 +00:00
Compare commits
3 Commits
revert-749
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e243632c7 | ||
|
|
3556eb4476 | ||
|
|
9343da7fe8 |
@@ -70,19 +70,23 @@ runs:
|
||||
--wait \
|
||||
--wait-for-jobs
|
||||
- name: Wait for GreptimeDB
|
||||
shell: bash
|
||||
run: |
|
||||
while true; do
|
||||
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
|
||||
if [ "$PHASE" == "Running" ]; then
|
||||
echo "Cluster is ready"
|
||||
break
|
||||
else
|
||||
echo "Cluster is not ready yet: Current phase: $PHASE"
|
||||
kubectl get pods -n my-greptimedb
|
||||
sleep 5 # wait for 5 seconds before check again.
|
||||
fi
|
||||
done
|
||||
uses: nick-fields/retry@v3
|
||||
with:
|
||||
timeout_minutes: 3
|
||||
max_attempts: 1
|
||||
shell: bash
|
||||
command: |
|
||||
while true; do
|
||||
PHASE=$(kubectl -n my-greptimedb get gtc my-greptimedb -o jsonpath='{.status.clusterPhase}')
|
||||
if [ "$PHASE" == "Running" ]; then
|
||||
echo "Cluster is ready"
|
||||
break
|
||||
else
|
||||
echo "Cluster is not ready yet: Current phase: $PHASE"
|
||||
kubectl get pods -n my-greptimedb
|
||||
sleep 5 # wait for 5 seconds before check again.
|
||||
fi
|
||||
done
|
||||
- name: Print GreptimeDB info
|
||||
if: always()
|
||||
shell: bash
|
||||
|
||||
@@ -399,8 +399,8 @@ impl InformationSchemaColumnsBuilder {
|
||||
self.is_nullables.push(Some("No"));
|
||||
}
|
||||
self.column_types.push(Some(&data_type));
|
||||
self.column_comments
|
||||
.push(column_schema.column_comment().map(|x| x.as_ref()));
|
||||
let column_comment = column_schema.column_comment().map(|x| x.as_ref());
|
||||
self.column_comments.push(column_comment);
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
|
||||
@@ -92,7 +92,7 @@ impl StoreConfig {
|
||||
pub fn tls_config(&self) -> Option<TlsOption> {
|
||||
if self.backend_tls_mode != TlsMode::Disable {
|
||||
Some(TlsOption {
|
||||
mode: self.backend_tls_mode.clone(),
|
||||
mode: self.backend_tls_mode,
|
||||
cert_path: self.backend_tls_cert_path.clone(),
|
||||
key_path: self.backend_tls_key_path.clone(),
|
||||
ca_cert_path: self.backend_tls_ca_cert_path.clone(),
|
||||
|
||||
@@ -236,7 +236,7 @@ impl StartCommand {
|
||||
};
|
||||
|
||||
let tls_opts = TlsOption::new(
|
||||
self.tls_mode.clone(),
|
||||
self.tls_mode,
|
||||
self.tls_cert_path.clone(),
|
||||
self.tls_key_path.clone(),
|
||||
self.tls_watch,
|
||||
|
||||
@@ -261,7 +261,7 @@ impl StartCommand {
|
||||
};
|
||||
|
||||
let tls_opts = TlsOption::new(
|
||||
self.tls_mode.clone(),
|
||||
self.tls_mode,
|
||||
self.tls_cert_path.clone(),
|
||||
self.tls_key_path.clone(),
|
||||
self.tls_watch,
|
||||
|
||||
@@ -868,6 +868,8 @@ impl PgStore {
|
||||
let client = match pool.get().await {
|
||||
Ok(client) => client,
|
||||
Err(e) => {
|
||||
// We need to log the debug for the error to help diagnose the issue.
|
||||
common_telemetry::error!(e; "Failed to get Postgres connection.");
|
||||
return GetPostgresConnectionSnafu {
|
||||
reason: e.to_string(),
|
||||
}
|
||||
|
||||
@@ -299,7 +299,7 @@ impl Default for MetasrvOptions {
|
||||
#[allow(deprecated)]
|
||||
server_addr: String::new(),
|
||||
store_addrs: vec!["127.0.0.1:2379".to_string()],
|
||||
backend_tls: None,
|
||||
backend_tls: Some(TlsOption::prefer()),
|
||||
selector: SelectorType::default(),
|
||||
enable_region_failover: false,
|
||||
heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use common_meta::kv_backend::etcd::create_etcd_tls_options;
|
||||
use common_telemetry::warn;
|
||||
use etcd_client::{Client, ConnectOptions};
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::ResultExt;
|
||||
@@ -38,9 +39,12 @@ pub async fn create_etcd_client_with_tls(
|
||||
client_options.keep_alive_interval,
|
||||
client_options.keep_alive_timeout,
|
||||
);
|
||||
|
||||
let all_endpoints_use_https = etcd_endpoints.iter().all(|e| e.starts_with("https"));
|
||||
if let Some(tls_config) = tls_config
|
||||
&& let Some(tls_options) = create_etcd_tls_options(&convert_tls_option(tls_config))
|
||||
.context(BuildTlsOptionsSnafu)?
|
||||
&& let Some(tls_options) =
|
||||
create_etcd_tls_options(&convert_tls_option(all_endpoints_use_https, tls_config))
|
||||
.context(BuildTlsOptionsSnafu)?
|
||||
{
|
||||
connect_options = connect_options.with_tls(tls_options);
|
||||
}
|
||||
@@ -50,9 +54,22 @@ pub async fn create_etcd_client_with_tls(
|
||||
.context(error::ConnectEtcdSnafu)
|
||||
}
|
||||
|
||||
fn convert_tls_option(tls_option: &TlsOption) -> common_meta::kv_backend::etcd::TlsOption {
|
||||
fn convert_tls_option(
|
||||
all_endpoints_use_https: bool,
|
||||
tls_option: &TlsOption,
|
||||
) -> common_meta::kv_backend::etcd::TlsOption {
|
||||
let mode = match tls_option.mode {
|
||||
TlsMode::Disable => common_meta::kv_backend::etcd::TlsMode::Disable,
|
||||
TlsMode::Prefer => {
|
||||
if all_endpoints_use_https {
|
||||
common_meta::kv_backend::etcd::TlsMode::Require
|
||||
} else {
|
||||
warn!(
|
||||
"All endpoints use HTTP, TLS prefer mode is not supported, using disable mode"
|
||||
);
|
||||
common_meta::kv_backend::etcd::TlsMode::Disable
|
||||
}
|
||||
}
|
||||
_ => common_meta::kv_backend::etcd::TlsMode::Require,
|
||||
};
|
||||
common_meta::kv_backend::etcd::TlsOption {
|
||||
|
||||
@@ -281,18 +281,18 @@ struct PlanRewriter {
|
||||
/// 2: Sort: t.pk1+t.pk2
|
||||
/// 3. Projection: t.number, t.pk1, t.pk2
|
||||
/// ```
|
||||
/// `Sort` will make a column requirement for `t.pk1` at level 2.
|
||||
/// `Sort` will make a column requirement for `t.pk1+t.pk2` at level 2.
|
||||
/// Which making `Projection` at level 1 need to add a ref to `t.pk1` as well.
|
||||
/// So that the expanded plan will be
|
||||
/// ```ignore
|
||||
/// Projection: t.number
|
||||
/// MergeSort: t.pk1
|
||||
/// MergeSort: t.pk1+t.pk2
|
||||
/// MergeScan: remote_input=
|
||||
/// Projection: t.number, "t.pk1+t.pk2" <--- the original `Projection` at level 1 get added with `t.pk1+t.pk2`
|
||||
/// Sort: t.pk1+t.pk2
|
||||
/// Projection: t.number, t.pk1, t.pk2
|
||||
/// ```
|
||||
/// Making `MergeSort` can have `t.pk1` as input.
|
||||
/// Making `MergeSort` can have `t.pk1+t.pk2` as input.
|
||||
/// Meanwhile `Projection` at level 3 doesn't need to add any new column because 3 > 2
|
||||
/// and col requirements at level 2 is not applicable for level 3.
|
||||
///
|
||||
@@ -392,10 +392,11 @@ impl PlanRewriter {
|
||||
&& ext_b.node.name() == MergeSortLogicalPlan::name()
|
||||
{
|
||||
// revert last `ConditionalCommutative` result for Sort plan in this case.
|
||||
// `update_column_requirements` left unchanged because Sort won't generate
|
||||
// new columns or remove existing columns.
|
||||
// also need to remove any column requirements made by the Sort Plan
|
||||
// as it may refer to columns later no longer exist(rightfully) like by aggregate or projection
|
||||
self.stage.pop();
|
||||
self.expand_on_next_part_cond_trans_commutative = false;
|
||||
self.column_requirements.clear();
|
||||
}
|
||||
}
|
||||
Commutativity::PartialCommutative => {
|
||||
@@ -680,6 +681,10 @@ struct EnforceDistRequirementRewriter {
|
||||
|
||||
impl EnforceDistRequirementRewriter {
|
||||
fn new(column_requirements: Vec<(HashSet<Column>, usize)>, cur_level: usize) -> Self {
|
||||
debug!(
|
||||
"Create EnforceDistRequirementRewriter with column_requirements: {:?} at cur_level: {}",
|
||||
column_requirements, cur_level
|
||||
);
|
||||
Self {
|
||||
column_requirements,
|
||||
cur_level,
|
||||
@@ -733,7 +738,7 @@ impl EnforceDistRequirementRewriter {
|
||||
.filter(|a| !a.is_empty())
|
||||
else {
|
||||
return Err(datafusion_common::DataFusionError::Internal(format!(
|
||||
"EnforceDistRequirementRewriter: no alias found for required column {original_col} in child plan {child} from original plan {original}",
|
||||
"EnforceDistRequirementRewriter: no alias found for required column {original_col} at level {level} in current node's child plan: \n{child} from original plan: \n{original}",
|
||||
)));
|
||||
};
|
||||
|
||||
|
||||
@@ -777,6 +777,67 @@ fn expand_step_aggr_proj() {
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
/// Make sure that `SeriesDivide` special handling correctly clean up column requirements from it's previous sort
|
||||
#[test]
|
||||
fn expand_complex_col_req_sort_pql() {
|
||||
// use logging for better debugging
|
||||
init_default_ut_logging();
|
||||
let test_table = TestTable::table_with_name(0, "t".to_string());
|
||||
let table_source = Arc::new(DefaultTableSource::new(Arc::new(
|
||||
DfTableProviderAdapter::new(test_table),
|
||||
)));
|
||||
|
||||
let plan = LogicalPlanBuilder::scan_with_filters("t", table_source.clone(), None, vec![])
|
||||
.unwrap()
|
||||
.sort(vec![
|
||||
col("pk1").sort(true, false),
|
||||
col("pk2").sort(true, false),
|
||||
col("pk3").sort(true, false), // make some col req here
|
||||
])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
let plan = SeriesDivide::new(
|
||||
vec!["pk1".to_string(), "pk2".to_string(), "pk3".to_string()],
|
||||
"ts".to_string(),
|
||||
plan,
|
||||
);
|
||||
let plan = LogicalPlan::Extension(datafusion_expr::Extension {
|
||||
node: Arc::new(plan),
|
||||
});
|
||||
|
||||
let plan = LogicalPlanBuilder::from(plan)
|
||||
.aggregate(vec![col("pk1"), col("pk2")], vec![min(col("number"))])
|
||||
.unwrap()
|
||||
.sort(vec![
|
||||
col("pk1").sort(true, false),
|
||||
col("pk2").sort(true, false),
|
||||
])
|
||||
.unwrap()
|
||||
.project(vec![col("pk1"), col("pk2")])
|
||||
.unwrap()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let config = ConfigOptions::default();
|
||||
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
|
||||
|
||||
let expected = [
|
||||
"Projection: t.pk1, t.pk2",
|
||||
" MergeSort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
|
||||
" MergeScan [is_placeholder=false, remote_input=[",
|
||||
"Projection: t.pk1, t.pk2",
|
||||
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST",
|
||||
" Aggregate: groupBy=[[t.pk1, t.pk2]], aggr=[[min(t.number)]]",
|
||||
r#" PromSeriesDivide: tags=["pk1", "pk2", "pk3"]"#,
|
||||
" Sort: t.pk1 ASC NULLS LAST, t.pk2 ASC NULLS LAST, t.pk3 ASC NULLS LAST",
|
||||
" TableScan: t",
|
||||
"]]",
|
||||
]
|
||||
.join("\n");
|
||||
assert_eq!(expected, result.to_string());
|
||||
}
|
||||
|
||||
/// should only expand `Sort`, notice `Sort` before `Aggregate` usually can and
|
||||
/// will be optimized out, and dist planner shouldn't handle that case, but
|
||||
/// for now, still handle that be expanding the `Sort` node
|
||||
|
||||
@@ -144,7 +144,7 @@ impl Categorizer {
|
||||
}
|
||||
}
|
||||
// all group by expressions are partition columns can push down, unless
|
||||
// another push down(including `Limit` or `Sort`) is already in progress(which will then prvent next cond commutative node from being push down).
|
||||
// another push down(including `Limit` or `Sort`) is already in progress(which will then prevent next cond commutative node from being push down).
|
||||
// TODO(discord9): This is a temporary solution(that works), a better description of
|
||||
// commutativity is needed under this situation.
|
||||
Commutativity::ConditionalCommutative(None)
|
||||
|
||||
@@ -234,7 +234,7 @@ impl QueryEngineState {
|
||||
rules.retain(|rule| rule.name() != name);
|
||||
}
|
||||
|
||||
/// Optimize the logical plan by the extension anayzer rules.
|
||||
/// Optimize the logical plan by the extension analyzer rules.
|
||||
pub fn optimize_by_extension_rules(
|
||||
&self,
|
||||
plan: DfLogicalPlan,
|
||||
|
||||
@@ -29,7 +29,7 @@ use strum::EnumString;
|
||||
use crate::error::{InternalIoSnafu, Result};
|
||||
|
||||
/// TlsMode is used for Mysql and Postgres server start up.
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, EnumString)]
|
||||
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, EnumString)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TlsMode {
|
||||
#[default]
|
||||
@@ -91,6 +91,17 @@ impl TlsOption {
|
||||
tls_option
|
||||
}
|
||||
|
||||
/// Creates a new TLS option with the prefer mode.
|
||||
pub fn prefer() -> Self {
|
||||
Self {
|
||||
mode: TlsMode::Prefer,
|
||||
cert_path: String::new(),
|
||||
key_path: String::new(),
|
||||
ca_cert_path: String::new(),
|
||||
watch: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Validates the TLS configuration.
|
||||
///
|
||||
/// Returns an error if:
|
||||
|
||||
@@ -32,6 +32,17 @@ SHOW CREATE TABLE comment_table_test;
|
||||
| | ) |
|
||||
+--------------------+---------------------------------------------------+
|
||||
|
||||
SELECT table_comment
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_table_test';
|
||||
|
||||
+-------------------------+
|
||||
| table_comment |
|
||||
+-------------------------+
|
||||
| table level description |
|
||||
+-------------------------+
|
||||
|
||||
-- Remove table comment
|
||||
COMMENT ON TABLE comment_table_test IS NULL;
|
||||
|
||||
@@ -54,6 +65,17 @@ SHOW CREATE TABLE comment_table_test;
|
||||
| | |
|
||||
+--------------------+---------------------------------------------------+
|
||||
|
||||
SELECT table_comment
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_table_test';
|
||||
|
||||
+---------------+
|
||||
| table_comment |
|
||||
+---------------+
|
||||
| |
|
||||
+---------------+
|
||||
|
||||
DROP TABLE comment_table_test;
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -90,6 +112,18 @@ SHOW CREATE TABLE comment_column_test;
|
||||
| | |
|
||||
+---------------------+---------------------------------------------------------+
|
||||
|
||||
SELECT column_comment
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_column_test'
|
||||
AND column_name = 'val';
|
||||
|
||||
+--------------------------+
|
||||
| column_comment |
|
||||
+--------------------------+
|
||||
| value column description |
|
||||
+--------------------------+
|
||||
|
||||
-- Remove column comment
|
||||
COMMENT ON COLUMN comment_column_test.val IS NULL;
|
||||
|
||||
@@ -112,6 +146,18 @@ SHOW CREATE TABLE comment_column_test;
|
||||
| | |
|
||||
+---------------------+----------------------------------------------------+
|
||||
|
||||
SELECT column_comment
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_column_test'
|
||||
AND column_name = 'val';
|
||||
|
||||
+----------------+
|
||||
| column_comment |
|
||||
+----------------+
|
||||
| |
|
||||
+----------------+
|
||||
|
||||
DROP TABLE comment_column_test;
|
||||
|
||||
Affected Rows: 0
|
||||
@@ -155,6 +201,16 @@ SHOW CREATE FLOW flow_comment_test;
|
||||
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
|
||||
+-------------------+------------------------------------------------------+
|
||||
|
||||
SELECT comment
|
||||
FROM information_schema.flows
|
||||
WHERE flow_name = 'flow_comment_test';
|
||||
|
||||
+------------------------+
|
||||
| comment |
|
||||
+------------------------+
|
||||
| flow level description |
|
||||
+------------------------+
|
||||
|
||||
-- Remove flow comment
|
||||
COMMENT ON FLOW flow_comment_test IS NULL;
|
||||
|
||||
@@ -170,6 +226,16 @@ SHOW CREATE FLOW flow_comment_test;
|
||||
| | AS SELECT desc_str, ts FROM flow_source_comment_test |
|
||||
+-------------------+------------------------------------------------------+
|
||||
|
||||
SELECT comment
|
||||
FROM information_schema.flows
|
||||
WHERE flow_name = 'flow_comment_test';
|
||||
|
||||
+---------+
|
||||
| comment |
|
||||
+---------+
|
||||
| |
|
||||
+---------+
|
||||
|
||||
DROP FLOW flow_comment_test;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -9,10 +9,18 @@ CREATE TABLE comment_table_test (
|
||||
-- Add table comment
|
||||
COMMENT ON TABLE comment_table_test IS 'table level description';
|
||||
SHOW CREATE TABLE comment_table_test;
|
||||
SELECT table_comment
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_table_test';
|
||||
|
||||
-- Remove table comment
|
||||
COMMENT ON TABLE comment_table_test IS NULL;
|
||||
SHOW CREATE TABLE comment_table_test;
|
||||
SELECT table_comment
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_table_test';
|
||||
|
||||
DROP TABLE comment_table_test;
|
||||
|
||||
@@ -27,10 +35,20 @@ CREATE TABLE comment_column_test (
|
||||
-- Add column comment
|
||||
COMMENT ON COLUMN comment_column_test.val IS 'value column description';
|
||||
SHOW CREATE TABLE comment_column_test;
|
||||
SELECT column_comment
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_column_test'
|
||||
AND column_name = 'val';
|
||||
|
||||
-- Remove column comment
|
||||
COMMENT ON COLUMN comment_column_test.val IS NULL;
|
||||
SHOW CREATE TABLE comment_column_test;
|
||||
SELECT column_comment
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = 'public'
|
||||
AND table_name = 'comment_column_test'
|
||||
AND column_name = 'val';
|
||||
|
||||
DROP TABLE comment_column_test;
|
||||
|
||||
@@ -54,12 +72,17 @@ SELECT desc_str, ts FROM flow_source_comment_test;
|
||||
-- Add flow comment
|
||||
COMMENT ON FLOW flow_comment_test IS 'flow level description';
|
||||
SHOW CREATE FLOW flow_comment_test;
|
||||
SELECT comment
|
||||
FROM information_schema.flows
|
||||
WHERE flow_name = 'flow_comment_test';
|
||||
|
||||
-- Remove flow comment
|
||||
COMMENT ON FLOW flow_comment_test IS NULL;
|
||||
SHOW CREATE FLOW flow_comment_test;
|
||||
SELECT comment
|
||||
FROM information_schema.flows
|
||||
WHERE flow_name = 'flow_comment_test';
|
||||
|
||||
DROP FLOW flow_comment_test;
|
||||
DROP TABLE flow_source_comment_test;
|
||||
DROP TABLE flow_sink_comment_test;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user