apache flink 入门
配置环境<br/> 包括<br/> JAVA_HOME<br/> jobmanager.rpc.address<br/> jobmanager.heap.mb 和 taskmanager.heap.mb<br/> taskmanager.numberOfTaskSlots<br/> taskmanager.tmp.dirs<br/> slaves文件
启动关闭
bin/start-cluster.sh
bin/stop-cluster.sh
初步使用 public static void main(String[] args) throws Exception { if (args.length != 2){<br/> System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");<br/> return;<br/> } String hostName = args[0];<br/> Integer port = Integer.parseInt(args[1]); // set up the execution environment<br/> final StreamExecutionEnvironment env = StreamExecutionEnvironment<br/> .getExecutionEnvironment(); // get input data<br/> DataStream<String> text = env.socketTextStream(hostName, port); DataStream<Tuple2<String, Integer>> counts =<br/> // split up the lines in pairs (2-tuples) containing: (word,1)<br/> text.flatMap(new LineSplitter())<br/> // group by the tuple field "0" and sum up tuple field "1"<br/> .keyBy(0)<br/> .sum(1); counts.print(); // execute program<br/> env.execute("WordCount from SocketTextStream Example");<br/> } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override<br/> public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {<br/> // normalize and split the line<br/> String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs<br/> for (String token : tokens) {<br/> if (token.length() > 0) {<br/> out.collect(new Tuple2<String, Integer>(token, 1));<br/> }<br/> }<br/> }<br/> }
编程步骤,和spark很类似<br/> Obtain an execution environment,<br/> Load/create the initial data,<br/> Specify transformations on this data,<br/> Specify where to put the results of your computations,<br/> Trigger the program execution
连接flink的接口 StreamExecutionEnvironment<br/> getExecutionEnvironment()<br/> createLocalEnvironment()<br/> createRemoteEnvironment(String host, int port, String... jarFiles) Accumulators & Counters 用于求和和计数<br/> 步骤包括定义,添加到上下文,操作,最后获取<br/> private IntCounter numLines = new IntCounter();<br/> getRuntimeContext().addAccumulator("num-lines", this.numLines);<br/> this.numLines.add(1);<br/> myJobExecutionResult=env.execute("xxx");<br/> myJobExecutionResult.getAccumulatorResult("num-lines")
并发数设置<br/> System Level:<br/> parallelism.default=10<br/> Client Level:<br/> ./bin/flink run -p 10 example.jar<br/> client.run(program, 10, true); Execution Environment Level:<br/> env.setParallelism(3); Operator Level:<br/> DataStream<Tuple2<String, Integer>> wordCounts = text<br/> .flatMap(new LineSplitter())<br/> .keyBy(0)<br/> .timeWindow(Time.seconds(5))<br/> .sum(1).setParallelism(5);
最后上架构图和执行流程图,看起来和spark很类似
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢