分布式缓存

Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。 当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。

示例

在ExecutionEnvironment中注册一个文件:

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt"); 复制代码

在用户函数中访问缓存文件或者目录(这里是一个map函数)。这个函数必须继承RichFunction,因为它需要使用RuntimeContext读取数据:

DataSet<String> result = data.map(new RichMapFunction<String, String>() {
            private ArrayList<String> dataList = new ArrayList<String>();

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //2:使用文件
                File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分布式缓存为:" + line); } } @Override public String map(String value) throws Exception { //在这里就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //业务逻辑 return dataList +":" + value; } }); result.printToErr(); } 复制代码

完整代码如下,仔细看注释:


public class DisCacheTest {

    public static void main(String[] args) throws Exception{

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
      //text 中有4个单词:hello flink hello FLINK env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/text","a.txt"); DataSource<String> data = env.fromElements("a", "b", "c", "d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2:使用文件 File myFile = getRuntimeContext().getDistributedCache().getFile("a.txt"); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.err.println("分布式缓存为:" + line); } } @Override public String map(String value) throws Exception { //在这里就可以使用dataList System.err.println("使用datalist:" + dataList + "------------" +value); //业务逻辑 return dataList +":" + value; } }); result.printToErr(); } }// 复制代码

输出结果如下:

[hello, flink, hello, FLINK]:a
[hello, flink, hello, FLINK]:b
[hello, flink, hello, FLINK]:c
[hello, flink, hello, FLINK]:d
复制代码

公众号推荐

  • 全网唯一一个从0开始帮助Java开发者转做大数据领域的公众号~
  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
  • 更多大数据技术欢迎和作者一起探讨~

更多相关文章

  1. java基础IO删除文件夹文件
  2. java动态加载jar文件
  3. IT兄弟连 JavaWeb教程 文件上传技术
  4. 在使用visualvm进行分析时,如何避免“Profiler代理警告:无法查找缓
  5. JAVA 实现tail -f 日志文件监控功能
  6. 读取Excel文件并跳过空行但不是空列

随机推荐

  1. 新手应该知道的php多图片上传的实现
  2. php构建一个区块链(含源码)
  3. PHP global 关键词的实例详解
  4. use在php中的使用方法(代码示例)
  5. PHP高级应用的讲解
  6. php的定界符<<<EOF的解析
  7. PHP Composer是什么技术?一起看看
  8. Mac搭建php的开发环境(图文详解)
  9. php获取mp3音频信息实例教程
  10. utf8和utf8mb4的区别详解