flink深度解析(Flink操练二十之并行度使用讲解)

package one; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @program: Flink_learn * @description: 并行度的设置 * 针对每个算子设置的并行度的优先级高于全局并行度 * 本程序需要两个任务插槽 * @author: Mr.逗 * @create: 2021-09-14 15:40 **/ public class Example3 { public static void main(String[] args) { // 获取流处理的运行时环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行任务的数量为1 // 需要1个任务插槽 env.setParallelism(1); //读取数据源 // 并行度设置为1 DataStreamSource<String> stream = env.fromElements("hello world", "hello world").setParallelism(1); // map操作 // 这里使用的flatMap方法 // map: 针对流中的每一个元素,输出一个元素 // flatMap:针对流中的每一个元素,输出0个,1个或者多个元素 // 并行度设置为2 SingleOutputStreamOperator<WordWithCount> mappedStream = stream // 输入泛型:String; 输出泛型:WordWithCount .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String v, Collector<WordWithCount> out) throws Exception { String[] words = v.split(" "); for (String w : words) { // 使用collect方法向下游发送数据 out.collect(new WordWithCount(w, 1L)); } } }).setParallelism(2); //分组shuffle // 第一个泛型:流中元素的泛型 // 第二个泛型:key的泛型 KeyedStream<WordWithCount, String> keyedStream = mappedStream.keyBy(new KeySelector<WordWithCount, String>() { @Override public String getKey(WordWithCount v) throws Exception { return v.word; } }); // reduce操作 // reduce会维护一个累加器 // 第一条数据到来,作为累加器输出 // 第二条数据到来,和累加器进行聚合操作,然后输出累加器 // 累加器和流中元素的类型是一样的 SingleOutputStreamOperator<WordWithCount> reduce = keyedStream.reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception { return new WordWithCount(v1.word, v1.count v2.count); } }); //输出 reduce.print(); String name = Example3.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } // POJO类 // 1. 必须是公有类 // 2. 所有字段必须是public // 3. 必须有空构造器 // 模拟了case class public static class WordWithCount { public String word; public Long count; public WordWithCount() { } public WordWithCount(String word, Long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" "word='" word '\'' ", count=" count '}'; } } } ,今天小编就来说说关于flink深度解析?下面更多详细答案一起来看看吧!

flink深度解析(Flink操练二十之并行度使用讲解)

flink深度解析

1、代码实现逻辑

package one; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @program: Flink_learn * @description: 并行度的设置 * 针对每个算子设置的并行度的优先级高于全局并行度 * 本程序需要两个任务插槽 * @author: Mr.逗 * @create: 2021-09-14 15:40 **/ public class Example3 { public static void main(String[] args) { // 获取流处理的运行时环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行任务的数量为1 // 需要1个任务插槽 env.setParallelism(1); //读取数据源 // 并行度设置为1 DataStreamSource<String> stream = env.fromElements("hello world", "hello world").setParallelism(1); // map操作 // 这里使用的flatMap方法 // map: 针对流中的每一个元素,输出一个元素 // flatMap:针对流中的每一个元素,输出0个,1个或者多个元素 // 并行度设置为2 SingleOutputStreamOperator<WordWithCount> mappedStream = stream // 输入泛型:String; 输出泛型:WordWithCount .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String v, Collector<WordWithCount> out) throws Exception { String[] words = v.split(" "); for (String w : words) { // 使用collect方法向下游发送数据 out.collect(new WordWithCount(w, 1L)); } } }).setParallelism(2); //分组shuffle // 第一个泛型:流中元素的泛型 // 第二个泛型:key的泛型 KeyedStream<WordWithCount, String> keyedStream = mappedStream.keyBy(new KeySelector<WordWithCount, String>() { @Override public String getKey(WordWithCount v) throws Exception { return v.word; } }); // reduce操作 // reduce会维护一个累加器 // 第一条数据到来,作为累加器输出 // 第二条数据到来,和累加器进行聚合操作,然后输出累加器 // 累加器和流中元素的类型是一样的 SingleOutputStreamOperator<WordWithCount> reduce = keyedStream.reduce(new ReduceFunction<WordWithCount>() { @Override public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception { return new WordWithCount(v1.word, v1.count v2.count); } }); //输出 reduce.print(); String name = Example3.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } // POJO类 // 1. 必须是公有类 // 2. 所有字段必须是public // 3. 必须有空构造器 // 模拟了case class public static class WordWithCount { public String word; public Long count; public WordWithCount() { } public WordWithCount(String word, Long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" "word='" word '\'' ", count=" count '}'; } } }

2、结果展示

WordWithCount{word='hello', count=1} WordWithCount{word='world', count=1} WordWithCount{word='hello', count=2} WordWithCount{word='world', count=2}

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页