其他数据源
除了前文章节介绍的数据源外,Byzer 由于其数据源的扩展特性,我们也可以支持其他数据源,需要注意的是,下述数据源是社区贡献,未经官方验证实测,如果您需要使用,可能需要相关的开发和测试。 如果你希望使用的数据源 Byzer 暂时没有适配,如果它符合 Spark datasource API 标准,也可以进行集成。 具体做法如下: > LOAD unknow.`` WHERE implClass="
除了前文章节介绍的数据源外,Byzer 由于其数据源的扩展特性,我们也可以支持其他数据源,需要注意的是,下述数据源是社区贡献,未经官方验证实测,如果您需要使用,可能需要相关的开发和测试。
如果你希望使用的数据源 Byzer 暂时没有适配,如果它符合 Spark datasource API 标准,也可以进行集成。 具体做法如下:
> LOAD unknow.`` WHERE implClass="数据源完整包名" AS table;
> SAVE table AS unknow.`/tmp/...` WHERE implClass="数据源完整包名";
其中 unknow
这个词汇是可以任意的,因为 Byzer 使用的是 implClass
中配置的完整包名。如果该驱动有其他参数,可以放在 where
从句中进行配置。
MySQL Binlog 同步
MySQL 得到了广泛的使用。数仓的一个核心点是需要将业务的数据库(离线或者实时)同步到数仓当中。离线模式比较简单,直接全量同步覆盖;而实时模式则会略微复杂些,一般而言,走的流程会是:
MySQL -> Cannel(或者一些其他工具) -> Kafka ->
流式引擎 -> Hudi(HBase)等能够提供更新存储的工具 ->
同步或者转储 -> 对外提供服务
我们看到,这是一个很繁琐的的流程。流程越长,总体出问题的概率就越高,调试也会越困难。
Byzer-lang 提供了一个非常简单的解决方案:
MySQL -> Byzer-lang -> Delta(HDFS)
用户的唯一工作是编写一个 Byzer-lang 程序,整个脚本只包含两段代码,一个 Load
, 一个 Save
,非常简单。
下面是 Load 语句:
set streamName="binlog";
load binlog.`` where
host="127.0.0.1"
and port="3306"
and userName="xxx"
and password="xxxx"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="4"
and binlogFileOffset="4"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;
set streamName
表明这是一个流式的脚本,并且这个流程序的名字是 binglog.
load
语句我们前面已经学习过,可以加载任意格式或者存储的数据为一张表。这里,我们将 MySQL binglog 的日志加载为一张表。
值得大家关注的参数主要有两组,第一组是 binglog 相关的:
bingLogNamePrefix
:MySQL binglog 配置的前缀。你可以咨询 DBA 来获得。binlogIndex
:从第几个 binglog 进行消费binlogFileOffset
: 从单个 binlog 文件的第几个位置开始消费
binlogFileOffset 并不能随便指定位置,因为他是二进制的,位置是有跳跃的。4 表示一个 binlogFileOffset
的起始位置,是一个特殊的数字。
如果用户不想从起始位置开始,那么可以咨询 DBA 或者自己通过如下命令查看一个合理的位置:
mysqlbinlog \
--start-datetime="2019-06-19 01:00:00" \
--stop-datetime="2019-06-20 23:00:00" \
--base64-output=decode-rows \
-vv master-bin.000004
提示:如果随意指定了一个不合适的位置,则数据无法得到消费,且无法实现增量同步。
第二组参数是过滤哪些库的哪些表需要被同步:
databaseNamePattern
:db 的过滤正则表达式tableNamePattern
: 表名的过滤正则表达式
现在我们得到了包含了 binlog 的 table1, 我们现在要通过它将数据同步到 Delta 表中。
这里一定需要了解,我们是同步数据,而不是同步 binlog 本身。 我们将 table1 持续更新到 Delta中。具体代码如下:
save append table1
as rate.`mysql_{db}.{table}`
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";
这里,我们对每个参数都会做个解释。
-
mysql_{db}.{table}
中的db,table
是占位符。因为我们一次性会同步很多数据库的多张表,如果全部手动指定会显得 非常的麻烦和低效。 Byzer-lang 的rate
数据源允许我们通过占位符进行替换。 -
第二个是
idCols
, 这个参数已经在前面 Delta 的章节中和大家见过面。idCols
需要用户指定一组联合主键,使得 Byzer-lang 能够完成 Upsert 语义。 -
第三个
syncType
表示我们同步的是 binlog , 这样才会执行 binlog 特有的操作。 -
最后两个参数
duration
,checkpointLocation
则是流式计算特有的,分别表示运行的周期以及运行日志存放在哪。
现在,我们已经完成了我们的目标,将任意表同步到 Delta 数据湖中!
目前 Binlog 同步有一些限制:
- MySQL 需要配置
binlog_format
= Row。 当然这个理论上是默认设置。 - 只支持 binlog 中 update/delete/insert 动作的同步。如果修改了数据库表结构,默认会同步失败,用户需要重新全量同步之后再进行增量同步。 如果希望能够继续运行,可以在
save
语句中设置mergeSchema
= "true"。 - 如果不同的表有不同的主键列(需要配置不同的
idCols
),那么可能需要些多个流式同步脚本。
常见错误
如果一直出现
Trying to restore lost connectioin to .....
Connected to ....
那么看看MySQL的 my.cnf 中的 server_id
参数是不是有配置。
ElasticSearch
ElasticSearch 是一个应用很广泛的数据系统。Byzer 也支持将其中的某个索引加载为表。
注意,ES 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars
添加相关的依赖才能使用。
加载数据
例子:
> SET data='''{"jack":"cool"}''';
> LOAD jsonStr.`data` AS data1;
> SAVE overwrite data1 AS es.`twitter/cool` WHERE
`es.index.auto.create`="true"
and es.nodes="127.0.0.1";
> LOAD es.`twitter/cool` WHERE
and es.nodes="127.0.0.1" AS table1;
> SELECT * FROM table1 AS output1;
> CONNECT es WHERE `es.index.auto.create`="true"
and es.nodes="127.0.0.1" AS es_instance;
> LOAD es.`es_instance/twitter/cool` AS table1;
> SELECT * FROM table1 AS output2;
在ES里,数据连接引用和表之间的分隔符不是
.
,而是/
。 这是因为ES索引名允许带"."。
ES 相关的参数可以参考驱动官方文档。
Solr
Solr 是一个应用很广泛的搜索引擎。Byzer 也支持将其中的某个索引加载为表。
注意,Solr 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars
带上相关的依赖才能使用。
加载数据
示例:
> SELECT 1 AS id, "this is mlsql_example" AS title_s AS mlsql_example_data;
> CONNECT solr WHERE `zkhost`="127.0.0.1:9983"
and `collection`="mlsql_example"
and `flatten_multivalued`="false"
AS solr1
;
> LOAD solr.`solr1/mlsql_example` AS mlsql_example;
> SAVE mlsql_example_data AS solr.`solr1/mlsql_example`
OPTIONS soft_commit_secs = "1";
在 Solr 里,数据连接引用和表之间的分隔符不是
.
,而是/
。 这是因为Solr索引名允许带"."。
所有 Solr 相关的参数可以参考驱动官方文档。
HBase 数据源
HBase 是一个应用很广泛的存储系统。Byzer 也支持将其中的某个索引加载为表。
注意,HBase 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars
带上相关的依赖才能使用。用户有两种种方式获得 HBase Jar包:
直接使用 hbase Datasource 插件; (暂不支持)- 访问 spark-hbase ,然后手动进行编译;
1. 安装及使用
1) 插件安装(暂不支持)
通过如下指令安装 ds-hbase-2x
:
> !plugin ds add tech.mlsql.plugins.ds.MLSQLHBase2x ds-hbase-2x;
因为 HBase 依赖很多,大概 80 多 M,下载会比较慢。
2) 在 Byzer 中执行
例子:
> SET rawText='''
{"id":9,"content":"Spark","label":0.0}
{"id":10,"content":"Byzer","label":0.0}
{"id":12,"content":"Byzer lang","label":0.0}
''';
> LOAD jsonStr.`rawText` AS orginal_text_corpus;
> SELECT cast(id as String) AS rowkey,content,label FROM orginal_text_corpus AS orginal_text_corpus1;
-- connect 后面接数据格式,这里是 hbase2x, 然后后面接一些配置。最后将这个连接命名为 hbase1.
> CONNECT hbase2x WHERE `zk`="127.0.0.1:2181"
and `family`="cf" AS hbase1;
> SAVE overwrite orginal_text_corpus1
AS hbase2x.`hbase1:kolo_example`;
> LOAD hbase2x.`hbase1:kolo_example` WHERE field.type.label="DoubleType"
AS kolo_example;
> SELECT * FROM kolo_example AS show_data;
DataFrame 代码:
val data = (0 to 255).map { i =>
HBaseRecord(i, "extra")
}
val tableName = "t1"
val familyName = "c1"
import spark.implicits._
sc.parallelize(data).toDF.write
.options(Map(
"outputTableName" -> cat,
"family" -> family
) ++ options)
.format("org.apache.spark.sql.execution.datasources.hbase2x")
.save()
val df = spark.read.format("org.apache.spark.sql.execution.datasources.hbase2x").options(
Map(
"inputTableName" -> tableName,
"family" -> familyName,
"field.type.col1" -> "BooleanType",
"field.type.col2" -> "DoubleType",
"field.type.col3" -> "FloatType",
"field.type.col4" -> "IntegerType",
"field.type.col5" -> "LongType",
"field.type.col6" -> "ShortType",
"field.type.col7" -> "StringType",
"field.type.col8" -> "ByteType"
)
).load()
2. 配置参数
ds-hbase-2x 的配置参数如下表:
参数名 | 参数含义 |
---|---|
tsSuffix | 覆盖 Hbase 值的时间戳 |
namespace | Hbase namespace |
family | Hbase family,family="" 表示加载所有存在的 family |
field.type.ck | 为 ck(field name) 指定类型,现在支持:LongType、FloatType、DoubleType、IntegerType、BooleanType、BinaryType、TimestampType、DateType,默认为: StringType |
MongoDB
MongoDB 是一个应用很广泛的存储系统。Byzer 也支持将其中的某个索引加载为表。
注意,MongoDB 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars
添加相关的依赖才能使用。
加载数据
例子:
> SET data='''{"jack":"cool"}''';
> LOAD jsonStr.`data` as data1;
> SAVE overwrite data1 AS mongo.`twitter/cool` WHERE
partitioner="MongoPaginateBySizePartitioner"
and uri="mongodb://127.0.0.1:27017/twitter";
> LOAD mongo.`twitter/cool` WHERE
partitioner="MongoPaginateBySizePartitioner"
and uri="mongodb://127.0.0.1:27017/twitter"
AS table1;
> SELECT * FROM table1 AS output1;
> CONNECT mongo WHERE
partitioner="MongoPaginateBySizePartitioner"
and uri="mongodb://127.0.0.1:27017/twitter"
AS mongo_instance;
> LOAD mongo.`mongo_instance/cool` AS table1;
> SELECT * FROM table1 AS output2;
> LOAD mongo.`cool` WHERE
partitioner="MongoPaginateBySizePartitioner"
and uri="mongodb://127.0.0.1:27017/twitter"
AS table1;
> SELECT * FROM table1 AS output3;
在MongoDB里,数据连接引用和表之间的分隔符不是
.
,而是/
。
更多推荐
所有评论(0)