背景:近期项目需要,引入flink,研究了下flink,步步踩坑终于可以单独运行,也可发布到集群运行,记录下踩坑点。开发环境:idea+springboot(2.3.5.RELEASSE)+kafka(2.8.1)+mysql(8.0.26)。废话不多说,直接上可执行代码。

以下代码实现了某个时间间隔,设备不上传数据,判断为离线的逻辑

一、项目application创建

/**
 * flink任务提交application
 *
 * @author wangfenglei
 */
@SpringBootApplication(scanBasePackages = {"com.wfl.firefighting.flink","com.wfl.firefighting.util"})
public class DataAnalysisFlinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(DataAnalysisFlinkApplication.class, args);
    }
}

二、设备状态计算主体,从kafka接收数据,然后通过KeyedProcessFunction函数进行计算,然后把离线设备输出到mysql sink,更新设备状态

/**
 * 从kafka读取数据,计算设备状态为离线后写入mysql
 *
 * @author wangfenglei
 */
@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class DeviceDataKafkaSource {
    private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private String kafkaServer;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String loginConfig;
    @Value("${customer.flink.cal-device-status-topic}")
    private String topic;
    @Autowired
    private ApplicationContext applicationContext;

    /**
     * 执行方法
     *
     * @throws Exception 异常
     */
    @PostConstruct
    public void execute() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(1);
        Properties properties = new Properties();
        //kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("bootstrap.servers", kafkaServer);
        //kafka的消费者的group.id
        properties.setProperty("group.id", "data-nanlysis-flink-devicestatus");
        //设置kafka安全认证机制为PLAIN
        properties.setProperty("sasl.mechanism", "PLAIN");
        //设置kafka安全认证协议为SASL_PLAINTEXT
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        //设置kafka登录验证用户名和密码
        properties.setProperty("sasl.jaas.config", loginConfig);

        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);

        DataStream<String> stream = env.addSource(myConsumer);
        stream.print().setParallelism(1);

        DataStream<String> deviceStatus = stream
                //进行转换只获取设备序列码
                .map(data -> CommonConstant.GSON.fromJson(data, MsgData.class).getDevSn())
                //按照设备序列码分组
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                })
                //进行计算,判断周期内是否有新数据上传,没有则输出认为设备离线
                .process((CalDeviceOfflineFunction) applicationContext.getBean("calDeviceOfflineFunction"));

        //写入数据库
        deviceStatus.addSink((SinkFunction) applicationContext.getBean("deviceStatusSink"));

        //启动任务
        new Thread(() -> {
            try {
                env.execute("deviceStatusFlinkJob");
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }).start();
    }
}

说明:

1、通过@ConditionalOnProperty开关形式控制程序是否执行,后续此模块可以开发多个flink执行任务,通过开关的形式提交需要的job

2、通过springboot的@PostConstruct注解,让项目application启动时,自动执行job

3、用Thread线程执行任务提交,否则application启动时会一直flink执行中

4、日志打印,需要使用slf4j,跟flink自己依赖jar包打印日志保持一致,如此在flink集群执行时可以打印日志

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);

5、kafka连接开启了登录验证,配置见application.yml。kafka登录验证server端配置见官网资料,后续有时间写个文章记录下

三、设备离线计算


/**
 * KeyedProcessFunction 为每个设备序列码维护一个state,并且会把间隔时间内(事件时间)内没有更新的设备序列码输出:
 * 对于每条记录,CalDeviceOfflineFunction 修改最后的时间戳。
 * 该函数还会在间隔时间内调用回调(事件时间)。
 * 每次调用回调时,都会检查存储的最后修改时间与回调的事件时间时间戳,如果匹配则发送设备序列码(即在间隔时间内没有更新,表示没有设备数据上传)
 *
 * @author wangfenglei
 */

@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class CalDeviceOfflineFunction extends KeyedProcessFunction<String, String, String> {
    private static final Logger log = LoggerFactory.getLogger(CalDeviceOfflineFunction.class);
    /**
     * 这个状态是通过 ProcessFunction维护
     */
    private ValueState<DeviceLastDataTimestamp> deviceState;
    /**
     * 定时任务执行时间
     */
    private ValueState<Long> timerState;
    @Autowired
    private DeviceService deviceService;

    @Override
    public void open(Configuration parameters) throws Exception {
        deviceState = getRuntimeContext().getState(new ValueStateDescriptor<>("deviceState", DeviceLastDataTimestamp.class));
        timerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timerState", Long.class));
    }

    /**
     * 每条数据执行过程
     *
     * @param value 输入数据
     * @param ctx   环境
     * @param out   输出数据
     * @throws Exception 异常
     */
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        log.info("++++++++++++++fink recevice deviceSn={}", value);
        // 查看当前计数
        DeviceLastDataTimestamp current = deviceState.value();
        if (current == null) {
            current = new DeviceLastDataTimestamp();
            current.key = value;
            current.lastDataTime = ctx.timestamp();
        }

        Long currentTimerState = timerState.value();
        if (null == currentTimerState) {
            //初始值设置为-1
            timerState.update(-1L);
        }

        if (-1 != timerState.value()) {
            //删除原先定时任务,然后重新注册新的定时任务
            ctx.timerService().deleteProcessingTimeTimer(timerState.value());
        }

        long interval = deviceService.getDeviceOfflineInterval(value);
        // 设置状态的时间戳为记录的事件时间时间戳
        current.lastDataTime = ctx.timestamp();
        //设置判断离线时间间隔
        current.interval = interval;
        // 状态回写
        deviceState.update(current);
        //更新定时任务执行时间
        timerState.update(current.lastDataTime + interval);
        //注册新的定时任务
        ctx.timerService().registerProcessingTimeTimer(current.lastDataTime + interval);
    }

    /**
     * 定时器触发后执行的方法
     *
     * @param timestamp 这个时间戳代表的是该定时器的触发时间
     * @param ctx       定时器环境类
     * @param out       输出
     * @throws Exception 异常
     */
    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<String> out) throws Exception {
        // 取得该设备状态的State状态
        DeviceLastDataTimestamp result = deviceState.value();

        // timestamp是定时器触发时间,如果等于最后一次更新时间+离线间隔时间,就表示这十秒内没有收到过该设备报文了
        if (timestamp == result.lastDataTime + result.interval) {
            // 发送
            out.collect(result.key);
            // 打印数据,用于核对是否符合预期
            log.info("==================" + result.key + " is offline");
        }
    }

    /**
     * 设备最后上传数据时间戳数据类
     */
    class DeviceLastDataTimestamp {
        public String key;
        public long lastDataTime;
        public long interval;
    }
}

四、 更新设备离线状态


/**
 * 向mysql写入数据
 *
 * @author wangfenglei
 */
@Component
@ConditionalOnProperty(name = "customer.flink.cal-device-status", havingValue = "true", matchIfMissing = false)
public class DeviceStatusSink extends RichSinkFunction<String> {
    private static final Logger log = LoggerFactory.getLogger(DeviceStatusSink.class);
    @Value("${spring.datasource.dynamic.datasource.master.url}")
    private String datasoureUrl;
    @Value("${spring.datasource.dynamic.datasource.master.username}")
    private String userName;
    @Value("${spring.datasource.dynamic.datasource.master.password}")
    private String password;
    @Value("${spring.datasource.dynamic.datasource.master.driver-class-name}")
    private String driverClass;
    private Connection conn = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        //加载驱动,开启连接
        try {
            Class.forName(driverClass);
            conn = DriverManager.getConnection(datasoureUrl, userName, password);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(String deviceSn, Context context) {
        try {
            String sql = "update biz_device t set t.status=2 where t.dev_sn=?";
            ps = conn.prepareStatement(sql);
            ps.setString(1, deviceSn);
            ps.executeUpdate();
            log.info("update biz_device t set t.status=2 where t.dev_sn={}", deviceSn);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 结束任务,关闭连接
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        if (conn != null) {
            conn.close();
        }
        if (ps != null) {
            ps.close();
        }
    }
}

五、application.yml配置

server:
  port: 8099

spring:
  autoconfigure:
      exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
  datasource:
    druid:
      stat-view-servlet:
        enabled: true
        loginUsername: admin
        loginPassword: 123456
        allow:
      web-stat-filter:
        enabled: true
    dynamic:
      druid: # 全局druid参数,绝大部分值和默认保持一致。(现已支持的参数如下,不清楚含义不要乱设置)
        # 连接池的配置信息
        # 初始化大小,最小,最大
        initial-size: 5
        min-idle: 5
        maxActive: 20
        # 配置获取连接等待超时的时间
        maxWait: 60000
        # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
        timeBetweenEvictionRunsMillis: 60000
        # 配置一个连接在池中最小生存的时间,单位是毫秒
        minEvictableIdleTimeMillis: 300000
        validationQuery: SELECT 1 FROM DUAL
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        # 打开PSCache,并且指定每个连接上PSCache的大小
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        # 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙
        filters: stat,wall,slf4j
        # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
        connectionProperties: druid.stat.mergeSql\=true;druid.stat.slowSqlMillis\=5000
      datasource:
        master:
          url: jdbc:mysql://127.0.0.1:3306/fire?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai
          username: root
          password: root
          driver-class-name: com.mysql.cj.jdbc.Driver
  kafka:
    bootstrap-servers: 127.0.0.1:9092 # 指定kafka 代理地址,可以多个
    producer: # 生产者
      retries: 1 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #修改最大向kafka推送消息大小
      properties:
        max.request.size: 52428800
    consumer:
      group-id: data-analysis-flink
      #手动提交offset保证数据一定被消费
      enable-auto-commit: false
      #指定从最近地方开始消费(earliest)
      auto-offset-reset: latest
      #消费者组
      #group-id: dev
    properties:
      #服务端没有收到心跳超时时间,设置长点以防调试时超时
      session:
        timeout:
          ms: 60000
      heartbeat:
        interval:
          ms: 30000
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="root" password="root";'
#自定义配置
customer:
  #flink相关配置
  flink:
    #是否开启设置状态计算
    cal-device-status: true
    cal-device-status-topic: device-upload-data

六、pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>com.wfl.firefighting</groupId>
		<artifactId>data-analysis</artifactId>
		<version>1.0.0</version>
	</parent>
	<groupId>com.wfl.firefighting</groupId>
	<artifactId>data-analysis-flink</artifactId>
	<version>1.0.0</version>
	<packaging>jar</packaging>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.springframework.boot</groupId>
					<artifactId>spring-boot-starter-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.wfl.firefighting</groupId>
			<artifactId>data-analysis-service</artifactId>
			<version>1.0.0</version>
		</dependency>
		<dependency>
			<groupId>com.wfl.firefighting</groupId>
			<artifactId>data-analysis-model</artifactId>
			<version>1.0.0</version>
		</dependency>
       <dependency>
           <groupId>io.github.openfeign</groupId>
           <artifactId>feign-httpclient</artifactId>
           <version>10.10.1</version>
       </dependency>
       <!-- druid -->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>druid-spring-boot-starter</artifactId>
			<version>1.1.22</version>
		</dependency>
		<!-- 动态数据源 -->
		<dependency>
			<groupId>com.baomidou</groupId>
			<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
			<version>2.5.4</version>
		</dependency>
		<!--mysql-->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.20</version>
		</dependency>

		<!-- flink依赖引入 开始-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>1.13.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>1.13.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>1.13.1</version>
		</dependency>
		<!-- flink连接kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.11</artifactId>
			<version>1.13.1</version>
		</dependency>
		<!-- flink连接es-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>1.13.1</version>
		</dependency>
		<!-- flink连接mysql-->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-jdbc_2.11</artifactId>
			<version>1.10.0</version>
		</dependency>
		<!-- flink依赖引入 结束-->
	</dependencies>

	<build>
		<finalName>data-analysis-flink</finalName>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.2.4</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<createDependencyReducedPom>false</createDependencyReducedPom>
							<artifactSet>
								<excludes>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<exclude>module-info.class</exclude>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
									<transformer
											implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
										<resource>META-INF/spring.handlers</resource>
										<resource>reference.conf</resource>
									</transformer>
									<transformer
											implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
										<resource>META-INF/spring.factories</resource>
									</transformer>
									<transformer
											implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
										<resource>META-INF/spring.schemas</resource>
									</transformer>
									<transformer
											implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
									<transformer
											implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
										<mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
									</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

说明: 

1、如果使用local执行方式,不需要提交到flink服务端执行job,可以使用spring-boot-maven-plugin,直接java -jar执行即可,如下:

<build>
    <finalName>data-analysis-flink</finalName>
    <plugins>
         <plugin>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-maven-plugin</artifactId>
             <!-- 指定启动入口 -->
             <configuration>
                 <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
             </configuration>
             <executions>
                 <execution>
                     <goals>
                         <!--可以把依赖的包都打包到生成的Jar包中-->
                         <goal>repackage</goal>
                     </goals>
                 </execution>
             </executions>
         </plugin>
     </plugins>
 </build>

使用spring-boot-maven-plugin打的jar包,提交到flink集群端执行,会报错,提示找不到类,因为springboot默认打包BOOT-INF目录,flink服务端执行会提示找不到类。使用maven-shade-plugin打包,既可以用java -jar执行,也可以提交到flink服务端执行。

2、maven-shade-plugin打的jar包,如果提交到服务端执行,需要去掉springboot默认集成的logback,否则服务端执行报错,提示Caused by: java.lang.IllegalArgumentException: LoggerFactory is not a Logback LoggerContext but Logback is on the classpath,如下:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
   <exclusions>
      <exclusion>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-logging</artifactId>
      </exclusion>
   </exclusions>
</dependency>

如果本地执行java -jar形式,需要在build的中注释掉以下内容,否则启动报错提示:java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory

<!--<exclude>org.slf4j:*</exclude>-->
<!--<exclude>log4j:*</exclude>-->

3、使用maven-shade-plugin打包,必须添加如下,否则提示Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ManifestResourceTransformer

<transformers>
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
         <resource>META-INF/spring.handlers</resource>
         <resource>reference.conf</resource>
      </transformer>
      <transformer
            implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
         <resource>META-INF/spring.factories</resource>
      </transformer>
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
         <resource>META-INF/spring.schemas</resource>
      </transformer>
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
      <transformer
            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
         <mainClass>com.wfl.firefighting.flink.DataAnalysisFlinkApplication</mainClass>
      </transformer>
</transformers>

七、执行效果:

1、本地执行

2、提交到flink集群执行 

八、其他踩坑点

1、报错提示:The RemoteEnvironment cannot be instantiated when running in a pre-defined context

解决方法:将StreamExecutionEnvironment修改为getExecutionEnvironment,获取当前执行环境

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、报错提示:  Insufficient number of network buffers: required 65, but only 38 available. The total number of network buffers is currently set to 2048 of 32768 bytes each.

解决办法:env.setParallelism(1)

env.setParallelism(1);

3、报错提示: Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'

解决办法: flink 配置文件里 flink-conf.yaml设置
classloader.check-leaked-classloader: false

Logo

更多推荐