Copyright Notice: This set of technical column is the author (Qin Kaixin) usually work summary and sublimation, through extracting cases from real business environment to summarize and share, and give business application tuning suggestions and cluster environment capacity planning and other content, please continue to pay attention to this set of blog. Copyright notice: No reprint, welcome to learn. QQ email address: [email protected], if you have any questions, you can contact at any time.

1 Distributed Cache

  • Flink provides a distributed cache, similar to Hadoop, that allows users to easily read local files in parallel functions and place them in taskManager nodes, preventing tasks from pulling repeatedly.
  • This cache works as follows: A program registers a file or directory (a local or remote file system, such as HDFS or S3), registers the cache file through ExecutionEnvironment and gives it a name. When the program executes, Flink automatically copies files or directories to the local file systems of all TaskManager nodes only once. The user can look up a file or directory by the specified name and then access it from the taskManager node’s local file system

2 Tips

  • 1: Registers a file

      env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")  
    Copy the code
  • 2: access data

      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
    Copy the code

3 Practical application cases

3.1 Create a file discache. TXT on drive D and registerCachedFile

3.2 There is a copy of each TaskManager to prevent MapTask from pulling files repeatedly.

Public class BatchDemoDisCache {public static void main(String[] args) throws Exception{// Obtain the ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Env.registercachedfile ("d:\\discache. TXT ","a.txt"); DataSource<String> data = env.fromElements("a", "b", "c", "d"); DataSet<String> result = data.map(new RichMapFunction<String, String>() { private ArrayList<String> dataList = new ArrayList<String>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2: use the File File myFile = getRuntimeContext().getdistributedCache ().getFile(" a.thist "); List<String> lines = FileUtils.readLines(myFile); for (String line : lines) { this.dataList.add(line); System.out.println("discache:" + line); Override public String map(String value) throws Exception {// Use dataList return value; }}); result.print(); }}Copy the code

3.3 Result Presentation

discache:flink
discache:spark
discache:hadoop
discache:kylin
a
b
c
d
Copy the code

4. Conclusion

I submit an essay with a clear theme. Hard written, each cherish, thank you!

Qin Kaixin in Shenzhen 201811251732