SeaTunnel DB2 Source Connector 使用文档(含详细操作步骤)
DB2是IBM的一款关系型数据库管理系统,JDBC DB2 Source Connector是一个用于通过JDBC读取外部数据源数据的连接器。Apache SeaTunnel如何支持JDBC DB2 Source Connector?请参考本文档。
支持引擎
Spark
Flink
SeaTunnel Zeta引擎
主要特性
- 批处理(batch)
- 精确一次(exactly-once)
- 列投影(column projection)
- 并行处理(parallelism)
- 支持用户自定义分割(support user-defined split)
它支持查询SQL并可以实现投影效果。
描述
通过 JDBC 读取外部数据源数据。
支持数据源
数据源 | 支持版本 | 驱动程序 | URL | Maven |
---|---|---|---|---|
DB2 | 不同的依赖版本有不同的驱动程序类。 | com.ibm.db2.jdbc.app.DB2Driver | jdbc:db2://127.0.0.1:50000/dbname | Download |
数据库依赖
数据库依赖需要下载对应'Maven'的支持列表并复制到
'$SEATNUNNEL_HOME/plugins/jdbc/lib/'
工作目录。
例如,DB2数据源:cp db2-connector-java-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
数据类型映射
DB2数据类型 | SeaTunnel数据类型 |
---|---|
BOOLEAN | BOOLEAN |
SMALLINT | SHORT |
INT INTEGER |
INTEGER |
BIGINT | LONG |
DECIMAL DEC NUMERIC NUM |
DECIMAL(38,18) |
REAL | FLOAT |
FLOAT DOUBLE DOUBLE PRECISION DECFLOAT |
DOUBLE |
CHAR VARCHAR LONG VARCHAR CLOB GRAPHIC VARGRAPHIC LONG VARGRAPHIC DBCLOB |
STRING |
BLOB | BYTES |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
ROWID XML |
Not supported yet |
源选项
名称 | 类型 | 必需 | 默认 | 描述 |
---|---|---|---|---|
url | String | Yes | - | JDBC连接的URL。参考案例:jdbc:db2:127.0.0.1:50000/dbname |
driver | String | Yes | - | 用于连接到远程数据源的jdbc类名。如果你使用db2,值是com.ibm.db2.jdbc.app.DB2Driver 。 |
user | String | No | - | 连接实例用户名 |
password | String | No | - | 连接实例密码 |
query | String | Yes | - | 查询语句 |
connection_check_timeout_sec | Int | No | 30 | 用于验证连接到完成以等待数据库操作的时间(以秒为单位) |
partition_column | String | No | - | 用于并行处理的分区列名,仅支持数值类型,仅支持数值类型主键,并且只能配置一个列。 |
partition_lower_bound | Long | No | - | partition_column的扫描最小值,如果未设置,SeaTunnel将查询数据库获取最小值。 |
partition_upper_bound | Long | No | - | partition_column的扫描最大值,如果未设置,SeaTunnel将查询数据库获取最大值。 |
partition_num | Int | 可选 | 作业并行度 | 分区计数的数量,仅支持正整数。默认值为作业的并行度。 |
fetch_size | Int | 可选 | 0 | 对于返回大量对象的查询,您可以配置在查询中使用的行抓取大小,以提高性能,减少满足选择条件所需的数据库访问次数。零表示使用 JDBC 的默认值。 |
common-options | 可选 | - | 源插件的通用参数,请参考 Source Common Options 获取详细信息。 |
提示
如果未设置
partition_column
,则将以单一并发方式运行;如果设置了partition_column
,则根据任务的并发度并行执行。
任务示例
简单示例:
此示例在单一并发模式下查询您的测试数据库中类型为 'table' 的 16 条数据,并查询其所有字段。您还可以指定要查询的最终输出到控制台的字段。
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
query = "select * from table_xxx"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}
sink {
Console {}
}
并行示例:
如果想要读取整个表,可以根据您配置的分片字段和分片数据,在并行方式下读取查询表。
source {
Jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# Define query logic as required
query = "select * from type_bin"
# Parallel sharding reads fields
partition_column = "id"
# Number of fragments
partition_num = 10
}
}
并行边界:
在查询中指定数据的上限和下限边界更加高效。根据您配置的上限和下限边界来读取您的数据源,效率更高。
source {
Jdbc {
url = "jdbc:db2://127.0.0.1:50000/dbname"
driver = "com.ibm.db2.jdbc.app.DB2Driver"
connection_check_timeout_sec = 100
user = "root"
password = "123456"
# Define query logic as required
query = "select * from type_bin"
partition_column = "id"
# Read start boundary
partition_lower_bound = 1
# Read end boundary
partition_upper_bound = 500
partition_num = 10
}
}