Compare commits

...

9 Commits

Author SHA1 Message Date
LFC
6ed6e39673 release v0.3.1 2023-06-21 16:28:41 +08:00
LFC
e5c61ec290 release v0.3.1 2023-06-21 16:28:07 +08:00
Ruihang Xia
b1ccc7ef5d fix: prevent filter pushdown in distributed planner (#1806)
* fix: prevent filter pushdown in distributed planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix metadata

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-06-21 16:25:50 +08:00
Lei, HUANG
d1b5ce0d35 chore: check catalog deregister result (#1810)
* chore: check deregister result and return error on failure

* refactor: SystemCatalog::deregister_table returns Result<()>
2023-06-21 08:09:11 +00:00
Lei, HUANG
a314993ab4 chore: change logstore default config (#1809) 2023-06-21 07:34:24 +00:00
LFC
fa522bc579 fix: drop region alive countdown tasks when deregistering table (#1808) 2023-06-21 14:49:32 +08:00
Lei, HUANG
5335203360 feat: support cross compilation to aarch64 linux (#1802) 2023-06-21 14:08:45 +08:00
Ruihang Xia
23bf55a265 fix: __field__ matcher on single value column (#1805)
* fix error text and field_column_names

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add sqlness test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add empty line

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* improve style

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2023-06-21 10:59:58 +08:00
Eugene Tolbakov
3b91fc2c64 feat: add initial implementation for status endpoint (#1789)
* feat: add initial implementation for status endpoint

* feat(status_endpoint): add more data to response

* feat(status_endpoint): use build data env vars

* feat(status_endpoint): add simple test

* fix(status_endpoint): adjust the toml indentation
2023-06-21 10:50:08 +08:00
27 changed files with 307 additions and 58 deletions

View File

@@ -22,7 +22,7 @@ name: Release
env:
RUST_TOOLCHAIN: nightly-2023-05-03
SCHEDULED_BUILD_VERSION_PREFIX: v0.4.0
SCHEDULED_BUILD_VERSION_PREFIX: v0.3.1
SCHEDULED_PERIOD: nightly

1
Cargo.lock generated
View File

@@ -8520,6 +8520,7 @@ dependencies = [
"axum-macros",
"axum-test-helper",
"base64 0.13.1",
"build-data",
"bytes",
"catalog",
"chrono",

View File

@@ -50,7 +50,7 @@ members = [
]
[workspace.package]
version = "0.4.0"
version = "0.3.1"
edition = "2021"
license = "Apache-2.0"

7
Cross.toml Normal file
View File

@@ -0,0 +1,7 @@
[build]
pre-build = [
"dpkg --add-architecture $CROSS_DEB_ARCH",
"apt update && apt install -y unzip zlib1g-dev:$CROSS_DEB_ARCH",
"curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip && unzip protoc-3.15.8-linux-x86_64.zip -d /usr/",
"chmod a+x /usr/bin/protoc && chmod -R a+rx /usr/include/google",
]

View File

@@ -26,8 +26,8 @@ tcp_nodelay = true
[wal]
# WAL data directory
# dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false

View File

@@ -81,9 +81,9 @@ addr = "127.0.0.1:4004"
# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
file_size = "1GB"
# WAL purge threshold in bytes.
purge_threshold = "50GB"
file_size = "256MB"
# WAL purge threshold.
purge_threshold = "4GB"
# WAL purge interval in seconds.
purge_interval = "10m"
# WAL read batch size.

View File

@@ -467,10 +467,7 @@ impl CatalogManager for LocalCatalogManager {
.ident
.table_id;
if !self.system.deregister_table(&request, table_id).await? {
return Ok(false);
}
self.system.deregister_table(&request, table_id).await?;
self.catalogs.deregister_table(request).await
}
}

View File

@@ -662,7 +662,7 @@ impl CatalogManager for RemoteCatalogManager {
.await;
}
Ok(result.is_none())
Ok(true)
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {

View File

@@ -104,10 +104,14 @@ impl RegionAliveKeepers {
Ok(())
}
pub async fn deregister_table(&self, table_ident: &TableIdent) {
if self.keepers.lock().await.remove(table_ident).is_some() {
pub async fn deregister_table(
&self,
table_ident: &TableIdent,
) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.remove(table_ident).map(|x| {
info!("Deregister RegionAliveKeeper for table {table_ident}");
}
x
})
}
pub async fn register_region(&self, region_ident: &RegionIdent) {
@@ -127,7 +131,7 @@ impl RegionAliveKeepers {
warn!("Alive keeper for region {region_ident} is not found!");
return;
};
keeper.deregister_region(region_ident.region_number).await
let _ = keeper.deregister_region(region_ident.region_number).await;
}
pub async fn start(&self) {
@@ -230,9 +234,11 @@ impl RegionAliveKeeper {
return;
}
let countdown_task_handles = self.countdown_task_handles.clone();
let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles);
let on_task_finished = async move {
let _ = countdown_task_handles.lock().await.remove(&region);
if let Some(x) = countdown_task_handles.upgrade() {
x.lock().await.remove(&region);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let handle = Arc::new(CountdownTaskHandle::new(
self.table_engine.clone(),
@@ -259,19 +265,18 @@ impl RegionAliveKeeper {
}
}
async fn deregister_region(&self, region: RegionNumber) {
if self
.countdown_task_handles
async fn deregister_region(&self, region: RegionNumber) -> Option<Arc<CountdownTaskHandle>> {
self.countdown_task_handles
.lock()
.await
.remove(&region)
.is_some()
{
info!(
"Deregister alive countdown for region {region} in table {}",
self.table_ident
)
}
.map(|x| {
info!(
"Deregister alive countdown for region {region} in table {}",
self.table_ident
);
x
})
}
async fn start(&self) {
@@ -319,6 +324,8 @@ enum CountdownCommand {
struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
table_ident: TableIdent,
region: RegionNumber,
}
impl CountdownTaskHandle {
@@ -341,7 +348,7 @@ impl CountdownTaskHandle {
let mut countdown_task = CountdownTask {
table_engine,
table_ident,
table_ident: table_ident.clone(),
region,
rx,
};
@@ -350,7 +357,12 @@ impl CountdownTaskHandle {
on_task_finished().await;
});
Self { tx, handler }
Self {
tx,
handler,
table_ident,
region,
}
}
async fn start(&self, heartbeat_interval_millis: u64) {
@@ -378,7 +390,11 @@ impl CountdownTaskHandle {
impl Drop for CountdownTaskHandle {
fn drop(&mut self) {
self.handler.abort()
debug!(
"Aborting region alive countdown task for region {} in table {}",
self.region, self.table_ident,
);
self.handler.abort();
}
}
@@ -640,7 +656,8 @@ mod test {
regions.sort();
assert_eq!(regions, vec![2, 3, 4]);
keepers.deregister_table(&table_ident).await;
let keeper = keepers.deregister_table(&table_ident).await.unwrap();
assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped");
assert!(keepers.keepers.lock().await.is_empty());
}
@@ -676,7 +693,8 @@ mod test {
// assert keep_lived works if keeper is started
assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later());
keeper.deregister_region(region).await;
let handle = keeper.deregister_region(region).await.unwrap();
assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped");
assert!(keeper.find_handle(&region).await.is_none());
}

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME};
use common_telemetry::logging;
use snafu::ResultExt;
use table::metadata::TableId;
use table::{Table, TableRef};
@@ -91,12 +92,21 @@ impl SystemCatalog {
&self,
request: &DeregisterTableRequest,
table_id: TableId,
) -> CatalogResult<bool> {
) -> CatalogResult<()> {
self.information_schema
.system
.delete(build_table_deletion_request(request, table_id))
.await
.map(|x| x == 1)
.map(|x| {
if x != 1 {
let table = common_catalog::format_full_table_name(
&request.catalog,
&request.schema,
&request.table_name
);
logging::warn!("Failed to delete table record from information_schema, unexpected returned result: {x}, table: {table}");
}
})
.with_context(|_| error::DeregisterTableSnafu {
request: request.clone(),
})

View File

@@ -52,4 +52,4 @@ serde.workspace = true
toml = "0.5"
[build-dependencies]
build-data = "0.1.3"
build-data = "0.1.4"

View File

@@ -192,8 +192,8 @@ impl Default for WalConfig {
fn default() -> Self {
Self {
dir: None,
file_size: ReadableSize::gb(1), // log file size 1G
purge_threshold: ReadableSize::gb(50), // purge threshold 50G
file_size: ReadableSize::mb(256), // log file size 256MB
purge_threshold: ReadableSize::gb(4), // purge threshold 4GB
purge_interval: Duration::from_secs(600),
read_batch_size: 128,
sync_write: false,

View File

@@ -541,7 +541,7 @@ impl PromPlanner {
result_set.insert(matcher.value.clone());
} else {
return Err(ColumnNotFoundSnafu {
col: self.ctx.table_name.clone().unwrap(),
col: matcher.value.clone(),
}
.build());
}
@@ -550,8 +550,8 @@ impl PromPlanner {
if col_set.contains(&matcher.value) {
reverse_set.insert(matcher.value.clone());
} else {
return Err(ValueNotFoundSnafu {
table: self.ctx.table_name.clone().unwrap(),
return Err(ColumnNotFoundSnafu {
col: matcher.value.clone(),
}
.build());
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
@@ -37,7 +37,8 @@ impl Categorizer {
pub fn check_plan(plan: &LogicalPlan) -> Commutativity {
match plan {
LogicalPlan::Projection(_) => Commutativity::Unimplemented,
LogicalPlan::Filter(_) => Commutativity::Commutative,
// TODO(ruihang): Change this to Commutative once Like is supported in substrait
LogicalPlan::Filter(filter) => Self::check_expr(&filter.predicate),
LogicalPlan::Window(_) => Commutativity::Unimplemented,
LogicalPlan::Aggregate(_) => {
// check all children exprs and uses the strictest level
@@ -85,6 +86,50 @@ impl Categorizer {
_ => Commutativity::Unsupported,
}
}
pub fn check_expr(expr: &Expr) -> Commutativity {
match expr {
Expr::Alias(_, _)
| Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
| Expr::BinaryExpr(_)
| Expr::Not(_)
| Expr::IsNotNull(_)
| Expr::IsNull(_)
| Expr::IsTrue(_)
| Expr::IsFalse(_)
| Expr::IsNotTrue(_)
| Expr::IsNotFalse(_)
| Expr::Negative(_)
| Expr::Between(_)
| Expr::Sort(_)
| Expr::Exists(_) => Commutativity::Commutative,
Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::IsUnknown(_)
| Expr::IsNotUnknown(_)
| Expr::GetIndexedField(_)
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_)
| Expr::ScalarFunction(_)
| Expr::ScalarUDF(_)
| Expr::AggregateFunction(_)
| Expr::WindowFunction(_)
| Expr::AggregateUDF(_)
| Expr::InList(_)
| Expr::InSubquery(_)
| Expr::ScalarSubquery(_)
| Expr::Wildcard => Commutativity::Unimplemented,
Expr::QualifiedWildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)
| Expr::OuterReferenceColumn(_, _) => Commutativity::Unimplemented,
}
}
}
pub type Transformer = Arc<dyn Fn(&LogicalPlan) -> Option<LogicalPlan>>;

View File

@@ -103,3 +103,6 @@ table = { path = "../table" }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.10"
tokio-test = "0.4"
[build-dependencies]
build-data = "0.1.4"

View File

@@ -13,6 +13,12 @@
// limitations under the License.
fn main() {
build_data::set_RUSTC_VERSION();
build_data::set_BUILD_HOSTNAME();
build_data::set_GIT_BRANCH();
build_data::set_GIT_COMMIT();
build_data::set_SOURCE_TIMESTAMP();
#[cfg(feature = "dashboard")]
fetch_dashboard_assets();
}

View File

@@ -512,6 +512,8 @@ impl HttpServer {
routing::get(handler::health).post(handler::health),
);
router = router.route("/status", routing::get(handler::status));
#[cfg(feature = "dashboard")]
{
if !self.options.disable_dashboard {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::env;
use std::time::Instant;
use aide::transform::TransformOperation;
@@ -158,3 +159,26 @@ pub struct HealthResponse {}
pub async fn health(Query(_params): Query<HealthQuery>) -> Json<HealthResponse> {
Json(HealthResponse {})
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
pub struct StatusResponse<'a> {
pub source_time: &'a str,
pub commit: &'a str,
pub branch: &'a str,
pub rustc_version: &'a str,
pub hostname: &'a str,
pub version: &'a str,
}
/// Handler to expose information info about runtime, build, etc.
#[axum_macros::debug_handler]
pub async fn status() -> Json<StatusResponse<'static>> {
Json(StatusResponse {
source_time: env!("SOURCE_TIMESTAMP"),
commit: env!("GIT_COMMIT"),
branch: env!("GIT_BRANCH"),
rustc_version: env!("RUSTC_VERSION"),
hostname: env!("BUILD_HOSTNAME"),
version: env!("CARGO_PKG_VERSION"),
})
}

View File

@@ -365,3 +365,18 @@ async fn test_health() {
expected_json_str
);
}
#[tokio::test]
async fn test_status() {
let expected_json = http_handler::StatusResponse {
source_time: env!("SOURCE_TIMESTAMP"),
commit: env!("GIT_COMMIT"),
branch: env!("GIT_BRANCH"),
rustc_version: env!("RUSTC_VERSION"),
hostname: env!("BUILD_HOSTNAME"),
version: env!("CARGO_PKG_VERSION"),
};
let Json(json) = http_handler::status().await;
assert_eq!(json, expected_json);
}

View File

@@ -28,7 +28,8 @@ use table::engine::{EngineContext, TableEngineProcedureRef, TableReference};
use table::requests::DropTableRequest;
use crate::error::{
AccessCatalogSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu, TableNotFoundSnafu,
AccessCatalogSnafu, DeregisterTableSnafu, DeserializeProcedureSnafu, SerializeProcedureSnafu,
TableNotFoundSnafu,
};
/// Procedure to drop a table.
@@ -158,10 +159,17 @@ impl DropTableProcedure {
schema: self.data.request.schema_name.clone(),
table_name: self.data.request.table_name.clone(),
};
self.catalog_manager
if !self
.catalog_manager
.deregister_table(deregister_table_req)
.await
.context(AccessCatalogSnafu)?;
.context(AccessCatalogSnafu)?
{
return DeregisterTableSnafu {
name: request.table_ref().to_string(),
}
.fail()?;
}
}
self.data.state = DropTableState::EngineDropTable;

View File

@@ -55,6 +55,9 @@ pub enum Error {
#[snafu(display("Table already exists: {}", name))]
TableExists { name: String },
#[snafu(display("Failed to deregister table: {}", name))]
DeregisterTable { name: String },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -64,7 +67,9 @@ impl ErrorExt for Error {
use Error::*;
match self {
SerializeProcedure { .. } | DeserializeProcedure { .. } => StatusCode::Internal,
DeregisterTable { .. } | SerializeProcedure { .. } | DeserializeProcedure { .. } => {
StatusCode::Internal
}
InvalidRawSchema { source, .. } => source.status_code(),
AccessCatalog { source, .. } => source.status_code(),
CatalogNotFound { .. } | SchemaNotFound { .. } | TableExists { .. } => {

View File

@@ -162,15 +162,14 @@ impl TableMeta {
}
pub fn field_column_names(&self) -> impl Iterator<Item = &String> {
let columns_schemas = &self.schema.column_schemas();
self.value_indices.iter().filter_map(|idx| {
let column = &columns_schemas[*idx];
if column.is_time_index() {
None
} else {
Some(&column.name)
}
})
// `value_indices` is wrong under distributed mode. Use the logic copied from DESC TABLE
let columns_schemas = self.schema.column_schemas();
let primary_key_indices = &self.primary_key_indices;
columns_schemas
.iter()
.enumerate()
.filter(|(i, cs)| !primary_key_indices.contains(i) && !cs.is_time_index())
.map(|(_, cs)| &cs.name)
}
/// Returns the new [TableMetaBuilder] after applying given `alter_kind`.

View File

@@ -90,11 +90,23 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i=
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_1.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
+---+---+
SELECT * FROM integers WHERE i NOT IN ((SELECT i FROM integers WHERE i=1)) ORDER BY i;
Error: 3001(EngineExecuteQuery), No field named __correlated_sq_2.i. Valid fields are integers.i, integers.j.
+---+---+
| i | j |
+---+---+
| 2 | 2 |
| 3 | 3 |
| | 4 |
+---+---+
SELECT * FROM integers WHERE i IN ((SELECT i FROM integers)) AND i<3 ORDER BY i;

View File

@@ -31,3 +31,41 @@ DROP TABLE test;
Affected Rows: 1
CREATE TABLE host_load1 (
ts TIMESTAMP(3) NOT NULL,
collector STRING NULL,
host STRING NULL,
val DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY (collector, host)
);
Affected Rows: 0
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
TQL EXPLAIN host_load1{__field__="val"};
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivide: tags=["collector", "host"] |
| | Sort: host_load1.collector DESC NULLS LAST, host_load1.host DESC NULLS LAST, host_load1.ts DESC NULLS LAST |
| | Projection: host_load1.val, host_load1.collector, host_load1.host, host_load1.ts |
| | MergeScan [is_placeholder=false] |
| | TableScan: host_load1 projection=[ts, collector, host, val], partial_filters=[ts >= TimestampMillisecond(-300000, None), ts <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[ts] |
| | PromSeriesNormalizeExec: offset=[0], time index=[ts], filter NaN: [false] |
| | PromSeriesDivideExec: tags=["collector", "host"] |
| | RepartitionExec: partitioning=REDACTED
| | ProjectionExec: expr=[val@3 as val, collector@1 as collector, host@2 as host, ts@0 as ts] |
| | MergeScanExec: peers=[REDACTED
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE host_load1;
Affected Rows: 1

View File

@@ -9,3 +9,18 @@ INSERT INTO test VALUES (1, 1, "a"), (1, 1, "b"), (2, 2, "a");
TQL EXPLAIN (0, 10, '5s') test;
DROP TABLE test;
CREATE TABLE host_load1 (
ts TIMESTAMP(3) NOT NULL,
collector STRING NULL,
host STRING NULL,
val DOUBLE NULL,
TIME INDEX (ts),
PRIMARY KEY (collector, host)
);
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
-- SQLNESS REPLACE (peer-.*) REDACTED
TQL EXPLAIN host_load1{__field__="val"};
DROP TABLE host_load1;

View File

@@ -0,0 +1,29 @@
CREATE TABLE host (
ts TIMESTAMP(3) TIME INDEX,
host STRING PRIMARY KEY,
val DOUBLE,
);
Affected Rows: 0
INSERT INTO TABLE host VALUES
(0, 'a+b', 1.0),
(1, 'b+c', 2.0),
(2, 'a', 3.0),
(3, 'c', 4.0);
Affected Rows: 4
SELECT * FROM host WHERE host LIKE '%+%';
+-------------------------+------+-----+
| ts | host | val |
+-------------------------+------+-----+
| 1970-01-01T00:00:00 | a+b | 1.0 |
| 1970-01-01T00:00:00.001 | b+c | 2.0 |
+-------------------------+------+-----+
DROP TABLE host;
Affected Rows: 1

View File

@@ -0,0 +1,15 @@
CREATE TABLE host (
ts TIMESTAMP(3) TIME INDEX,
host STRING PRIMARY KEY,
val DOUBLE,
);
INSERT INTO TABLE host VALUES
(0, 'a+b', 1.0),
(1, 'b+c', 2.0),
(2, 'a', 3.0),
(3, 'c', 4.0);
SELECT * FROM host WHERE host LIKE '%+%';
DROP TABLE host;