大象教程
首页
Spark
Hadoop
HDFS
MapReduce
Hive
Flink 教程
什么是 Flink
Flink 开发环境搭建
Flink 部署
Flink 运行时架构
Flink 流处理API
#搭建 maven 工程 FlinkTutorial pom 文件 ```xml
4.0.0
com.atguigu.flink
FlinkTutorial
1.0-SNAPSHOT
org.apache.flink
flink-java
1.10.1
org.apache.flink
flink-streaming-java_2.12
1.10.1
``` #批处理 wordcount 把下面代码放在这个路径的WordCount.java文件里面:`src/main/java/com.atguigu.wc/WordCount.java` ```java public class WordCount { public static void main(String[] args) throws Exception { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 String inputPath = "hello.txt"; DataSet
inputDataSet = env.readTextFile(inputPath); // 空格分词打散之后,对单词进行 groupby 分组,然后用 sum 进行聚合 DataSet
> wordCountDataSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) .sum(1); // 打印输出 wordCountDataSet.print(); } public static class MyFlatMapper implements FlatMapFunction
> { public void flatMap(String value, Collector
> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(new Tuple2
(word, 1)); } } } } ``` #流处理 wordcount 把下面代码放在`src/main/scala/com.atguigu.wc/StreamWordCount.java` ```java public class StreamWordCount { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); DataStream
inputDataStream = env.socketTextStream(host, port); DataStream
> wordCountDataStream = inputDataStream .flatMap( new WordCount.MyFlatMapper()) .keyBy(0) .sum(1); wordCountDataStream.print().setParallelism(1); env.execute(); } } ``` 测试——在 linux 系统中用 netcat 命令进行发送测试。 ```perl nc -lk 7777 ```
加我微信交流吧