flink批量(batch)写入mysql/oracle
1、前言博主之前分享过一篇文章,是flink高性能写入关系型数据库,那篇文章的效果虽然可以实现写入数据的高性能,但是牺牲了程序的健壮性,比如遇到不可控因素:数据库重启,连接失效,连接超时等,这样线上运行的程序可能就会出现问题,并且这样的问题可能只会日志打印error,并不会导致程序的挂掉,所以如果出现这样的问题,很难被发现。接下来,博主分享一波源代码,实现流式处理批量写入关系型数据库。整个程序的流
·
1、前言
博主之前分享过一篇文章,是flink高性能写入关系型数据库,那篇文章的效果虽然可以实现写入数据的高性能,但是牺牲了程序的健壮性,比如遇到不可控因素:数据库重启,连接失效,连接超时等,这样线上运行的程序可能就会出现问题,并且这样的问题可能只会日志打印error,并不会导致程序的挂掉,所以如果出现这样的问题,很难被发现。
接下来,博主分享一波源代码,实现流式处理批量写入关系型数据库。
整个程序的流量是这样的: kafka->flink->mysql
2、driver类描述:flink消费kafka数据源,利用window实现每10s进行一次sink。
package com.learn.flinkBatchMysql;
import com.learn.metricsCounter.MyMapper;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Driver {
public static void main(String[] args) throws Exception {
//1、flink运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、kafka数据源
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","centos:9092");
properties.setProperty("group.id", "aa");
FlinkKafkaConsumer011<String> kafkaSource0 = new FlinkKafkaConsumer011<>("hhhh", new SimpleStringSchema(), properties);
DataStreamSource<String> kafkaSource = env.addSource(kafkaSource0);
//3、流式数据没10s做为一个批次,写入到mysql
SingleOutputStreamOperator<List<String>> streamOperator = kafkaSource.timeWindowAll(Time.seconds(10)).apply(new AllWindowFunction<String, List<String>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<String> values, Collector<List<String>> out) throws Exception {
ArrayList<String> students = Lists.newArrayList(values);
if (students.size() > 0) {
out.collect(students);
}
}
});
//4、每批的数据批量写入到mysql
streamOperator.addSink(new SinkToMySQL());
env.execute("metricsCounter");
}
}
3、sink类描述:利用batch实现一个窗口内数据输出到mysql。
package com.learn.flinkBatchMysql;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
/**
* 程序功能: 数据批量 sink 数据到 mysql
* */
public class SinkToMySQL extends RichSinkFunction<List<String>> {
private PreparedStatement ps;
private BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每批数据的插入都要调用一次 invoke() 方法
*/
@Override
public void invoke(List<String> value, Context context) throws Exception {
//遍历数据集合
for (String student : value) {
ps.setInt(1, 1);
ps.setString(2, student);
ps.setString(3, "123456");
ps.setInt(4, 18);
ps.addBatch();
}
int[] count = ps.executeBatch(); //批量后执行
System.out.println("成功了插入了" + count.length + "行数据");
}
private static Connection getConnection(BasicDataSource dataSource) {
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
//注意,替换成自己本地的 mysql 数据库地址和用户名、密码
dataSource.setUrl("jdbc:mysql://localhost:3306/novel");
dataSource.setUsername("root");
dataSource.setPassword("root");
//设置连接池的一些参数
dataSource.setInitialSize(10);
dataSource.setMaxTotal(50);
dataSource.setMinIdle(2);
Connection con = null;
try {
con = dataSource.getConnection();
System.out.println("创建连接池:" + con);
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
4、sink类描述:利用batch实现一个窗口内数据输出到oracle。
package com.learn.flinkBatchMysql;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;
/**
* 程序功能: 数据批量 sink 数据到 mysql
* */
public class SinkToMySQL extends RichSinkFunction<List<String>> {
private PreparedStatement ps;
private BasicDataSource dataSource;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
dataSource = new BasicDataSource();
connection = getConnection(dataSource);
String sql = "insert into STUDENT(ID,NAME,PASSWORD,AAAA) values(?, ?, ?, ?)";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每批数据的插入都要调用一次 invoke() 方法
*/
@Override
public void invoke(List<String> value, Context context) throws Exception {
//遍历数据集合
for (String student : value) {
ps.setInt(1, 1);
ps.setString(2, student);
ps.setString(3, "123456");
ps.setInt(4, 18);
ps.addBatch();
}
int[] count = ps.executeBatch(); //批量后执行
System.out.println("成功了插入了" + count.length + "行数据");
}
private static Connection getConnection(BasicDataSource dataSource) {
//设置连接池的一些参数
dataSource.setInitialSize(10);
dataSource.setMaxTotal(50);
dataSource.setMinIdle(2);
Connection con = null;
try {
Class.forName("oracle.jdbc.driver.OracleDriver");
con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "hr", "hr");
System.out.println("创建连接池:" + con);
} catch (Exception e) {
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
oracle的使用建议表名以及字段名全部使用大写,以免报错:如表或视图不存在或者无效的列。
5、总结:
网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力,如果要提高性能必定要批量的写。就拿我们现在这篇文章来说,如果数据量大的话,聚合一分钟数据达万条,那么这样批量写会比来一条写一条性能提高不知道有多少。
更多推荐
已为社区贡献1条内容
所有评论(0)