四、自定义sink

1.自定义sink

package code.book.stream.customsinkandsource.jdbc.java;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class StudentSinkToMysql extends RichSinkFunction<Student> {
    private Connection connection = null;
    private PreparedStatement ps = null;

    /** * 一、open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。 */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        String driver = "com.mysql.jdbc.Driver";
        String url = "jdbc:mysql://qingcheng11:3306/flinktest";
        String username = "root";
        String password = "qingcheng";
        //1.加载驱动
        Class.forName(driver);
        //2.创建连接
        connection = DriverManager.getConnection(url, username, password);
        String sql = "insert into Student(stuid,stuname,stuaddr,stusex)values(?,?,?,?);";
        //3.获得执行语句
        ps = connection.prepareStatement(sql);
    }


    /** * 二、每个元素的插入都要调用一次invoke()方法,这里主要进行插入操作 */
    @Override
    public void invoke(Student student) throws Exception {
        try {
            //4.组装数据,执行插入操作
            ps.setInt(1, student.getStuid());
            ps.setString(2, student.getStuname());
            ps.setString(3, student.getStuaddr());
            ps.setString(4, student.getStusex());
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        //5.关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }
}

2.sink测试程序

package code.book.stream.customsinkandsource.jdbc.java;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StudentSinkToMysqlTest {
    public static void main(String[] args) throws Exception {
        //1.创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.准备数据
        DataStream<Student> students = env.fromElements(
                new Student(11, "zhangsan", "beijing biejing", "female"),
                new Student(12, "lisi", "tainjing tianjin", "male ")
        );

        //3.将数据写入到自定义的sink中(这里是mysql)
        students.addSink(new StudentSinkToMysql());

        //4.触发流执行
        env.execute();
    }
}

3.sink测试效果

在mysql中能够查询到,flink写入的数据。

这里写图片描述

更多相关文章

  1. 使用SWT/JFace与WindowBuilder绑定数据的参考资料
  2. 01500105_MLDN-魔乐科技-李兴华【Java核心技术】_JDBC连接Oracle
  3. 数据结构-java与c实现带头结点的单链表

随机推荐

  1. Android SQLite存取图像
  2. android 调用draw(canvas) 函数自动退出
  3. Android支持multiDexEnabled,自建脚本编译
  4. android 加载模式
  5. Android(安卓)布局 屏幕适配
  6. Android动态设置控件大小以及设定margin
  7. 【翻译】(26)Android如何绘画视图
  8. Android HAL 开发 (1)
  9. android解析xml文件 Android DOM解析XML
  10. Android Button 控件绑定单击事件