1、前言

博主之前分享过一篇文章,是flink高性能写入关系型数据库,那篇文章的效果虽然可以实现写入数据的高性能,但是牺牲了程序的健壮性,比如遇到不可控因素:数据库重启,连接失效,连接超时等,这样线上运行的程序可能就会出现问题,并且这样的问题可能只会日志打印error,并不会导致程序的挂掉,所以如果出现这样的问题,很难被发现。

接下来,博主分享一波源代码,实现流式处理批量写入关系型数据库。

整个程序的流量是这样的: kafka->flink->mysql

2、driver类描述:flink消费kafka数据源,利用window实现每10s进行一次sink。

package com.learn.flinkBatchMysql;

import com.learn.metricsCounter.MyMapper;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;


import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Driver {
    public static void main(String[] args) throws Exception {
        //1、flink运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、kafka数据源
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","centos:9092");
        properties.setProperty("group.id", "aa");
        FlinkKafkaConsumer011<String> kafkaSource0 = new FlinkKafkaConsumer011<>("hhhh", new SimpleStringSchema(), properties);
        DataStreamSource<String> kafkaSource = env.addSource(kafkaSource0);

        //3、流式数据没10s做为一个批次,写入到mysql
        SingleOutputStreamOperator<List<String>> streamOperator = kafkaSource.timeWindowAll(Time.seconds(10)).apply(new AllWindowFunction<String, List<String>, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable<String> values, Collector<List<String>> out) throws Exception {
                ArrayList<String> students = Lists.newArrayList(values);
                if (students.size() > 0) {
                    out.collect(students);
                }
            }
        });

        //4、每批的数据批量写入到mysql
        streamOperator.addSink(new SinkToMySQL());

        env.execute("metricsCounter");
    }
}

3、sink类描述:利用batch实现一个窗口内数据输出到mysql。

package com.learn.flinkBatchMysql;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;


/**
 * 程序功能: 数据批量 sink 数据到 mysql
 * */
public class SinkToMySQL extends RichSinkFunction<List<String>> {
    private PreparedStatement ps;
    private BasicDataSource dataSource;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每批数据的插入都要调用一次 invoke() 方法
     */
    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        //遍历数据集合
        for (String student : value) {
            ps.setInt(1, 1);
            ps.setString(2, student);
            ps.setString(3, "123456");
            ps.setInt(4, 18);
            ps.addBatch();
        }
        int[] count = ps.executeBatch();  //批量后执行
        System.out.println("成功了插入了" + count.length + "行数据");
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //注意,替换成自己本地的 mysql 数据库地址和用户名、密码
        dataSource.setUrl("jdbc:mysql://localhost:3306/novel");
        dataSource.setUsername("root");
        dataSource.setPassword("root");
        //设置连接池的一些参数
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            System.out.println("创建连接池:" + con);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

4、sink类描述:利用batch实现一个窗口内数据输出到oracle。

package com.learn.flinkBatchMysql;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;


/**
 * 程序功能: 数据批量 sink 数据到 mysql
 * */
public class SinkToMySQL extends RichSinkFunction<List<String>> {
    private PreparedStatement ps;
    private BasicDataSource dataSource;
    private Connection connection;

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into STUDENT(ID,NAME,PASSWORD,AAAA) values(?, ?, ?, ?)";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每批数据的插入都要调用一次 invoke() 方法
     */
    @Override
    public void invoke(List<String> value, Context context) throws Exception {
        //遍历数据集合
        for (String student : value) {
            ps.setInt(1, 1);
            ps.setString(2, student);
            ps.setString(3, "123456");
            ps.setInt(4, 18);
            ps.addBatch();
        }
        int[] count = ps.executeBatch();  //批量后执行
        System.out.println("成功了插入了" + count.length + "行数据");
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        //设置连接池的一些参数
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            Class.forName("oracle.jdbc.driver.OracleDriver");
            con = DriverManager.getConnection("jdbc:oracle:thin:@localhost:1521:xe", "hr", "hr");
            System.out.println("创建连接池:" + con);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

oracle的使用建议表名以及字段名全部使用大写,以免报错:如表或视图不存在或者无效的列。

5、总结:

网上很多的例子都是简单的 demo 形式,都是单条数据就创建数据库连接插入 MySQL,如果要写的数据量很大的话,会对 MySQL 的写有很大的压力,如果要提高性能必定要批量的写。就拿我们现在这篇文章来说,如果数据量大的话,聚合一分钟数据达万条,那么这样批量写会比来一条写一条性能提高不知道有多少。

Logo

更多推荐