除了前文章节介绍的数据源外,Byzer 由于其数据源的扩展特性,我们也可以支持其他数据源,需要注意的是,下述数据源是社区贡献,未经官方验证实测,如果您需要使用,可能需要相关的开发和测试。

如果你希望使用的数据源 Byzer 暂时没有适配,如果它符合 Spark datasource API 标准,也可以进行集成。 具体做法如下:

> LOAD unknow.`` WHERE implClass="数据源完整包名" AS table;
> SAVE table AS unknow.`/tmp/...` WHERE implClass="数据源完整包名";

其中 unknow 这个词汇是可以任意的,因为 Byzer 使用的是 implClass 中配置的完整包名。如果该驱动有其他参数,可以放在 where 从句中进行配置。

MySQL Binlog 同步

MySQL 得到了广泛的使用。数仓的一个核心点是需要将业务的数据库(离线或者实时)同步到数仓当中。离线模式比较简单,直接全量同步覆盖;而实时模式则会略微复杂些,一般而言,走的流程会是:

MySQL -> Cannel(或者一些其他工具) -> Kafka -> 

流式引擎 -> Hudi(HBase)等能够提供更新存储的工具 -> 

同步或者转储 -> 对外提供服务  

我们看到,这是一个很繁琐的的流程。流程越长,总体出问题的概率就越高,调试也会越困难。

Byzer-lang 提供了一个非常简单的解决方案:

MySQL -> Byzer-lang -> Delta(HDFS)

用户的唯一工作是编写一个 Byzer-lang 程序,整个脚本只包含两段代码,一个 Load, 一个 Save,非常简单。

下面是 Load 语句:

set streamName="binlog";

load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxx"
and password="xxxx"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="4"
and binlogFileOffset="4"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

set streamName 表明这是一个流式的脚本,并且这个流程序的名字是 binglog.

load 语句我们前面已经学习过,可以加载任意格式或者存储的数据为一张表。这里,我们将 MySQL binglog 的日志加载为一张表。

值得大家关注的参数主要有两组,第一组是 binglog 相关的:

  1. bingLogNamePrefix:MySQL binglog 配置的前缀。你可以咨询 DBA 来获得。
  2. binlogIndex :从第几个 binglog 进行消费
  3. binlogFileOffset: 从单个 binlog 文件的第几个位置开始消费

binlogFileOffset 并不能随便指定位置,因为他是二进制的,位置是有跳跃的。4 表示一个 binlogFileOffset 的起始位置,是一个特殊的数字。

如果用户不想从起始位置开始,那么可以咨询 DBA 或者自己通过如下命令查看一个合理的位置:

mysqlbinlog \ 
--start-datetime="2019-06-19 01:00:00" \ 
--stop-datetime="2019-06-20 23:00:00" \ 
--base64-output=decode-rows \
-vv  master-bin.000004

提示:如果随意指定了一个不合适的位置,则数据无法得到消费,且无法实现增量同步。

第二组参数是过滤哪些库的哪些表需要被同步:

  1. databaseNamePattern:db 的过滤正则表达式
  2. tableNamePattern : 表名的过滤正则表达式

现在我们得到了包含了 binlog 的 table1, 我们现在要通过它将数据同步到 Delta 表中。

这里一定需要了解,我们是同步数据,而不是同步 binlog 本身。 我们将 table1 持续更新到 Delta中。具体代码如下:

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

这里,我们对每个参数都会做个解释。

  • mysql_{db}.{table} 中的db,table是占位符。因为我们一次性会同步很多数据库的多张表,如果全部手动指定会显得 非常的麻烦和低效。 Byzer-lang 的 rate 数据源允许我们通过占位符进行替换。

  • 第二个是 idCols, 这个参数已经在前面 Delta 的章节中和大家见过面。idCols 需要用户指定一组联合主键,使得 Byzer-lang 能够完成 Upsert 语义。

  • 第三个 syncType 表示我们同步的是 binlog , 这样才会执行 binlog 特有的操作。

  • 最后两个参数 durationcheckpointLocation 则是流式计算特有的,分别表示运行的周期以及运行日志存放在哪。

现在,我们已经完成了我们的目标,将任意表同步到 Delta 数据湖中!

目前 Binlog 同步有一些限制:

  1. MySQL 需要配置 binlog_format = Row。 当然这个理论上是默认设置。
  2. 只支持 binlog 中 update/delete/insert 动作的同步。如果修改了数据库表结构,默认会同步失败,用户需要重新全量同步之后再进行增量同步。 如果希望能够继续运行,可以在 save 语句中设置 mergeSchema= "true"。
  3. 如果不同的表有不同的主键列(需要配置不同的 idCols ),那么可能需要些多个流式同步脚本。

常见错误

如果一直出现

Trying to restore lost connectioin to .....
Connected to ....

那么看看MySQL的 my.cnf 中的 server_id 参数是不是有配置。

 

ElasticSearch

ElasticSearch 是一个应用很广泛的数据系统。Byzer 也支持将其中的某个索引加载为表。

注意,ES 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars 添加相关的依赖才能使用。

加载数据

例子:

> SET data='''{"jack":"cool"}''';
> LOAD jsonStr.`data` AS data1;

> SAVE overwrite data1 AS es.`twitter/cool` WHERE
`es.index.auto.create`="true"
and es.nodes="127.0.0.1";

> LOAD es.`twitter/cool` WHERE
and es.nodes="127.0.0.1" AS table1;

> SELECT * FROM table1 AS output1;

> CONNECT es WHERE  `es.index.auto.create`="true"
and es.nodes="127.0.0.1" AS es_instance;

> LOAD es.`es_instance/twitter/cool` AS table1;
> SELECT * FROM table1 AS output2;

在ES里,数据连接引用和表之间的分隔符不是.,而是/。 这是因为ES索引名允许带"."。

ES 相关的参数可以参考驱动官方文档

 

Solr

Solr 是一个应用很广泛的搜索引擎。Byzer 也支持将其中的某个索引加载为表。

注意,Solr 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars 带上相关的依赖才能使用。

加载数据

示例:

> SELECT 1 AS id, "this is mlsql_example" AS title_s AS mlsql_example_data;

> CONNECT solr WHERE `zkhost`="127.0.0.1:9983"
and `collection`="mlsql_example"
and `flatten_multivalued`="false"
AS solr1
;

> LOAD solr.`solr1/mlsql_example` AS mlsql_example;

> SAVE mlsql_example_data AS solr.`solr1/mlsql_example`
OPTIONS soft_commit_secs = "1";

在 Solr 里,数据连接引用和表之间的分隔符不是.,而是/。 这是因为Solr索引名允许带"."。

所有 Solr 相关的参数可以参考驱动官方文档

 

HBase 数据源

HBase 是一个应用很广泛的存储系统。Byzer 也支持将其中的某个索引加载为表。

注意,HBase 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars 带上相关的依赖才能使用。用户有两种种方式获得 HBase Jar包:

  1. 直接使用 hbase Datasource 插件; (暂不支持)
  2. 访问 spark-hbase ,然后手动进行编译;

1. 安装及使用

1) 插件安装(暂不支持)

通过如下指令安装 ds-hbase-2x :

> !plugin ds add tech.mlsql.plugins.ds.MLSQLHBase2x ds-hbase-2x;

因为 HBase 依赖很多,大概 80 多 M,下载会比较慢。

2) 在 Byzer 中执行

例子:

> SET rawText='''
{"id":9,"content":"Spark","label":0.0}
{"id":10,"content":"Byzer","label":0.0}
{"id":12,"content":"Byzer lang","label":0.0}
''';

> LOAD jsonStr.`rawText` AS orginal_text_corpus;

> SELECT cast(id as String) AS rowkey,content,label FROM orginal_text_corpus AS orginal_text_corpus1;

-- connect 后面接数据格式,这里是 hbase2x, 然后后面接一些配置。最后将这个连接命名为 hbase1. 
> CONNECT hbase2x WHERE `zk`="127.0.0.1:2181"
and `family`="cf" AS hbase1;

> SAVE overwrite orginal_text_corpus1 
AS hbase2x.`hbase1:kolo_example`;

> LOAD hbase2x.`hbase1:kolo_example` WHERE field.type.label="DoubleType"
AS kolo_example;

> SELECT * FROM kolo_example AS show_data;

DataFrame 代码:

val data = (0 to 255).map { i =>
      HBaseRecord(i, "extra")
    }
val tableName = "t1"
val familyName = "c1"


import spark.implicits._
sc.parallelize(data).toDF.write
  .options(Map(
    "outputTableName" -> cat,
    "family" -> family
  ) ++ options)
  .format("org.apache.spark.sql.execution.datasources.hbase2x")
  .save()
  
val df = spark.read.format("org.apache.spark.sql.execution.datasources.hbase2x").options(
  Map(
    "inputTableName" -> tableName,
    "family" -> familyName,
    "field.type.col1" -> "BooleanType",
    "field.type.col2" -> "DoubleType",
    "field.type.col3" -> "FloatType",
    "field.type.col4" -> "IntegerType",
    "field.type.col5" -> "LongType",
    "field.type.col6" -> "ShortType",
    "field.type.col7" -> "StringType",
    "field.type.col8" -> "ByteType"
  )
).load() 

2. 配置参数

ds-hbase-2x 的配置参数如下表:

参数名参数含义
tsSuffix覆盖 Hbase 值的时间戳
namespaceHbase namespace
familyHbase family,family="" 表示加载所有存在的 family
field.type.ck为 ck(field name) 指定类型,现在支持:LongType、FloatType、DoubleType、IntegerType、BooleanType、BinaryType、TimestampType、DateType,默认为: StringType

 

 

MongoDB

MongoDB 是一个应用很广泛的存储系统。Byzer 也支持将其中的某个索引加载为表。

注意,MongoDB 的包并没有包含在 Byzer 默认发行包里,所以你需要通过 --jars 添加相关的依赖才能使用。

加载数据

例子:

> SET data='''{"jack":"cool"}''';

> LOAD jsonStr.`data` as data1;

> SAVE overwrite data1 AS mongo.`twitter/cool` WHERE
    partitioner="MongoPaginateBySizePartitioner"
    and uri="mongodb://127.0.0.1:27017/twitter";

> LOAD mongo.`twitter/cool` WHERE
    partitioner="MongoPaginateBySizePartitioner"
    and uri="mongodb://127.0.0.1:27017/twitter"
    AS table1;
> SELECT * FROM table1 AS output1;

> CONNECT mongo WHERE
    partitioner="MongoPaginateBySizePartitioner"
    and uri="mongodb://127.0.0.1:27017/twitter" 
    AS mongo_instance;

> LOAD mongo.`mongo_instance/cool` AS table1;

> SELECT * FROM table1 AS output2;

> LOAD mongo.`cool` WHERE
    partitioner="MongoPaginateBySizePartitioner"
    and uri="mongodb://127.0.0.1:27017/twitter"
    AS table1;
> SELECT * FROM table1 AS output3;

在MongoDB里,数据连接引用和表之间的分隔符不是.,而是/

Logo

更多推荐