-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Feature][SQL Config] Add SQL config adapter (#6757)
- Loading branch information
Showing
35 changed files
with
2,038 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
# SQL Configuration File | ||
|
||
## Structure of SQL Configuration File | ||
|
||
The `SQL` configuration file appears as follows. | ||
|
||
### SQL | ||
|
||
```sql | ||
/* config | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
} | ||
*/ | ||
|
||
CREATE TABLE source_table WITH ( | ||
'connector'='jdbc', | ||
'type'='source', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'query' = 'select * from source', | ||
'properties'= '{ | ||
useSSL = false, | ||
rewriteBatchedStatements = true | ||
}' | ||
); | ||
|
||
CREATE TABLE sink_table WITH ( | ||
'connector'='jdbc', | ||
'type'='sink', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'generate_sink_sql' = 'true', | ||
'database' = 'seatunnel', | ||
'table' = 'sink' | ||
); | ||
|
||
INSERT INTO sink_table SELECT id, name, age, email FROM source_table; | ||
``` | ||
|
||
## Explanation of `SQL` Configuration File | ||
|
||
### General Configuration in SQL File | ||
|
||
```sql | ||
/* config | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
} | ||
*/ | ||
``` | ||
|
||
In the `SQL` file, common configuration sections are defined using `/* config */` comments. Inside, common configurations like `env` can be defined using `HOCON` format. | ||
|
||
### SOURCE SQL Syntax | ||
|
||
```sql | ||
CREATE TABLE source_table WITH ( | ||
'connector'='jdbc', | ||
'type'='source', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'query' = 'select * from source', | ||
'properties' = '{ | ||
useSSL = false, | ||
rewriteBatchedStatements = true | ||
}' | ||
); | ||
``` | ||
|
||
* Using `CREATE TABLE ... WITH (...)` syntax creates a mapping for the source table. The `TABLE` name is the name of the source-mapped table, and the `WITH` syntax contains source-related configuration parameters. | ||
* There are two fixed parameters in the WITH syntax: `connector` and `type`, representing connector plugin name (such as `jdbc`, `FakeSource`, etc.) and source type (fixed as `source`), respectively. | ||
* Other parameter names can reference relevant configuration parameters of the corresponding connector plugin, but the format needs to be changed to `'key' = 'value',`. | ||
* If `'value'` is a sub-configuration, you can directly use a string in `HOCON` format. Note: if using a sub-configuration in `HOCON` format, the internal property items must be separated by `,`, like this: | ||
|
||
```sql | ||
'properties' = '{ | ||
useSSL = false, | ||
rewriteBatchedStatements = true | ||
}' | ||
``` | ||
|
||
* If using `'` within `'value'`, it needs to be escaped with `''`, like this: | ||
|
||
```sql | ||
'query' = 'select * from source where name = ''Joy Ding''' | ||
``` | ||
|
||
### SINK SQL Syntax | ||
|
||
```sql | ||
CREATE TABLE sink_table WITH ( | ||
'connector'='jdbc', | ||
'type'='sink', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'generate_sink_sql' = 'true', | ||
'database' = 'seatunnel', | ||
'table' = 'sink' | ||
); | ||
``` | ||
|
||
* Using `CREATE TABLE ... WITH (...)` syntax creates a mapping for the target table. The `TABLE` name is the name of the target-mapped table, and the `WITH` syntax contains sink-related configuration parameters. | ||
* There are two fixed parameters in the `WITH` syntax: `connector` and `type`, representing connector plugin name (such as `jdbc`, `console`, etc.) and target type (fixed as `sink`), respectively. | ||
* Other parameter names can reference relevant configuration parameters of the corresponding connector plugin, but the format needs to be changed to `'key' = 'value',`. | ||
|
||
### INSERT INTO SELECT Syntax | ||
|
||
```sql | ||
INSERT INTO sink_table SELECT id, name, age, email FROM source_table; | ||
``` | ||
|
||
* The `SELECT FROM` part is the table name of the source-mapped table. | ||
* The `INSERT INTO` part is the table name of the target-mapped table. | ||
* Note: This syntax does **not support** specifying fields in `INSERT`, like this: `INSERT INTO sink_table (id, name, age, email) SELECT id, name, age, email FROM source_table;` | ||
|
||
### INSERT INTO SELECT TABLE Syntax | ||
|
||
```sql | ||
INSERT INTO sink_table SELECT source_table; | ||
``` | ||
|
||
* The `SELECT` part directly uses the name of the source-mapped table, indicating that all data from the source table will be inserted into the target table. | ||
* Using this syntax does not generate related `transform` configurations. This syntax is generally used in multi-table synchronization scenarios. For example: | ||
|
||
```sql | ||
CREATE TABLE source_table WITH ( | ||
'connector'='jdbc', | ||
'type' = 'source', | ||
'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'table_list' = '[ | ||
{ | ||
table_path = "source.table1" | ||
}, | ||
{ | ||
table_path = "source.table2", | ||
query = "select * from source.table2" | ||
} | ||
]' | ||
); | ||
|
||
CREATE TABLE sink_table WITH ( | ||
'connector'='jdbc', | ||
'type' = 'sink', | ||
'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'generate_sink_sql' = 'true', | ||
'database' = 'sink' | ||
); | ||
|
||
INSERT INTO sink_table SELECT source_table; | ||
``` | ||
|
||
### CREATE TABLE AS Syntax | ||
|
||
```sql | ||
CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table; | ||
``` | ||
|
||
* This syntax creates a temporary table with the result of a `SELECT` query, used for `INSERT INTO` operations. | ||
* The syntax of the `SELECT` part refers to: [SQL-transform](../transform-v2/sql.md) `query` configuration item | ||
|
||
```sql | ||
CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table; | ||
|
||
INSERT INTO sink_table SELECT * FROM temp1; | ||
``` | ||
|
||
## Example of SQL Configuration File Submission | ||
|
||
```bash | ||
./bin/seatunnel.sh --config ./config/sample.sql | ||
``` | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,189 @@ | ||
# SQL配置文件 | ||
|
||
## SQL配置文件结构 | ||
|
||
`SQL`配置文件类似下面。 | ||
|
||
### SQL | ||
|
||
```sql | ||
/* config | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
} | ||
*/ | ||
|
||
CREATE TABLE source_table WITH ( | ||
'connector'='jdbc', | ||
'type'='source', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'query' = 'select * from source', | ||
'properties'= '{ | ||
useSSL = false, | ||
rewriteBatchedStatements = true | ||
}' | ||
); | ||
|
||
CREATE TABLE sink_table WITH ( | ||
'connector'='jdbc', | ||
'type'='sink', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'generate_sink_sql' = 'true', | ||
'database' = 'seatunnel', | ||
'table' = 'sink' | ||
); | ||
|
||
INSERT INTO sink_table SELECT id, name, age, email FROM source_table; | ||
``` | ||
|
||
## `SQL`配置文件说明 | ||
|
||
### 通用配置 | ||
|
||
```sql | ||
/* config | ||
env { | ||
parallelism = 1 | ||
job.mode = "BATCH" | ||
} | ||
*/ | ||
``` | ||
|
||
在`SQL`文件中通过 `/* config */` 注释定义通用配置部分,内部可以使用`hocon`格式定义通用的配置,如`env`等。 | ||
|
||
### SOURCE SQL语法 | ||
|
||
```sql | ||
CREATE TABLE source_table WITH ( | ||
'connector'='jdbc', | ||
'type'='source', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'query' = 'select * from source', | ||
'properties' = '{ | ||
useSSL = false, | ||
rewriteBatchedStatements = true | ||
}' | ||
); | ||
``` | ||
|
||
* 使用 `CREATE TABLE ... WITH (...)` 语法可创建源端表映射, `TABLE`表名为源端映射的表名,`WITH`语法中为源端相关的配置参数 | ||
* 在WITH语法中有两个固定参数:`connector` 和 `type`,分别表示连接器插件名(如:`jdbc`、`FakeSource`等)和源端类型(固定为:`source`) | ||
* 其它参数名可以参考对应连接器插件的相关配置参数,但是格式需要改为`'key' = 'value',`的形式 | ||
* 如果`'value'`为一个子配置,可以直接使用`hocon`格式的字符串,注意:如果使用`hocon`格式的子配置,内部的属性项之间必须用`,`分隔!如: | ||
|
||
```sql | ||
'properties' = '{ | ||
useSSL = false, | ||
rewriteBatchedStatements = true | ||
}' | ||
``` | ||
|
||
* 如果在`'value'`中使用到`'`,需要用`''`进行转义,如: | ||
|
||
```sql | ||
'query' = 'select * from source where name = ''Joy Ding''' | ||
``` | ||
|
||
### SINK SQL语法 | ||
|
||
```sql | ||
CREATE TABLE sink_table WITH ( | ||
'connector'='jdbc', | ||
'type'='sink', | ||
'url' = 'jdbc:mysql://localhost:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'generate_sink_sql' = 'true', | ||
'database' = 'seatunnel', | ||
'table' = 'sink' | ||
); | ||
``` | ||
|
||
* 使用 `CREATE TABLE ... WITH (...)` 语法可创建目标端表映射, `TABLE`表名为目标端映射的表名,`WITH`语法中为目标端相关的配置参数 | ||
* 在WITH语法中有两个固定参数:`connector` 和 `type`,分别表示连接器插件名(如:`jdbc`、`console`等)和目标端类型(固定为:`sink`) | ||
* 其它参数名可以参考对应连接器插件的相关配置参数,但是格式需要改为`'key' = 'value',`的形式 | ||
|
||
### INSERT INTO SELECT语法 | ||
|
||
```sql | ||
INSERT INTO sink_table SELECT id, name, age, email FROM source_table; | ||
``` | ||
|
||
* `SELECT FROM` 部分为源端映射表的表名,`SELECT` 部分的语法参考:[SQL-transform](../transform-v2/sql.md) `query` 配置项 | ||
* `INSERT INTO` 部分为目标端映射表的表名 | ||
* 注意:该语法**不支持**在 `INSERT` 中指定字段,如:`INSERT INTO sink_table (id, name, age, email) SELECT id, name, age, email FROM source_table;` | ||
|
||
### INSERT INTO SELECT TABLE语法 | ||
|
||
```sql | ||
INSERT INTO sink_table SELECT source_table; | ||
``` | ||
|
||
* `SELECT` 部分直接使用源端映射表的表名,表示将源端表的所有数据插入到目标端表中 | ||
* 使用该语法不会生成`trasform`的相关配置,这种语法一般用在多表同步的场景,示例: | ||
|
||
```sql | ||
CREATE TABLE source_table WITH ( | ||
'connector'='jdbc', | ||
'type' = 'source', | ||
'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'table_list' = '[ | ||
{ | ||
table_path = "source.table1" | ||
}, | ||
{ | ||
table_path = "source.table2", | ||
query = "select * from source.table2" | ||
} | ||
]' | ||
); | ||
|
||
CREATE TABLE sink_table WITH ( | ||
'connector'='jdbc', | ||
'type' = 'sink', | ||
'url' = 'jdbc:mysql://127.0.0.1:3306/seatunnel', | ||
'driver' = 'com.mysql.cj.jdbc.Driver', | ||
'user' = 'root', | ||
'password' = '123456', | ||
'generate_sink_sql' = 'true', | ||
'database' = 'sink' | ||
); | ||
|
||
INSERT INTO sink_table SELECT source_table; | ||
``` | ||
|
||
### CREATE TABLE AS语法 | ||
|
||
```sql | ||
CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table; | ||
``` | ||
|
||
* 该语法可以将一个`SELECT`查询结果作为一个临时表,用于的`INSERT INTO`操作 | ||
* `SELECT` 部分的语法参考:[SQL-transform](../transform-v2/sql.md) `query` 配置项 | ||
|
||
```sql | ||
CREATE TABLE temp1 AS SELECT id, name, age, email FROM source_table; | ||
|
||
INSERT INTO sink_table SELECT * FROM temp1; | ||
``` | ||
|
||
## SQL配置文件任务提交示例 | ||
|
||
```bash | ||
./bin/seatunnel.sh --config ./config/sample.sql | ||
``` | ||
|
Oops, something went wrong.