MacOS安装Apache Flink

apache-flink

本文仅对MacOS下安装Apache Flink做一个记录。

安装

按照官网介绍:

1
2
3
4
$ brew install apache-flink
...
$ flink --version
Version: 1.2.0, Commit ID: 1c659cf

但是安装的时候报错:

1
2
3
4
5
6
7
8
9
10
11
12
$ brew install apache-flink
Updating Homebrew...
^C
==> Downloading https://www.apache.org/dyn/closer.lua?path=flink/flink-1.7.1/flink-1.7.1-bin-hadoop27-scala_2.11.tgz
==> Downloading from https://mirror.serverion.com/apache/flink/flink-1.7.1/flink-1.7.1-bin-hadoop27-scala_2.11.tgz
curl: (22) The requested URL returned error: 404 Not Found
Trying a mirror...
==> Downloading https://downloads.apache.org/flink/flink-1.7.1/flink-1.7.1-bin-hadoop27-scala_2.11.tgz
curl: (22) The requested URL returned error: 404 Not Found
Error: An exception occurred within a child process:
DownloadError: Failed to download resource "apache-flink"
Download failed: https://downloads.apache.org/flink/flink-1.7.1/flink-1.7.1-bin-hadoop27-scala_2.11.tgz

没办法,只能尝试另一种方式安装,先看下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ brew info apache-flink
apache-flink: stable 1.7.1, HEAD
Scalable batch and stream data processing
https://flink.apache.org/
Not installed
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-flink.rb
==> Requirements
Required: java = 1.8 ✔
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 589 (30 days), 1,887 (90 days), 11,291 (365 days)
install_on_request: 586 (30 days), 1,882 (90 days), 11,253 (365 days)
build_error: 0 (30 days)

打开安装文件地址:https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-flink.rb,如果要切换版本安装,查看文件修改记录,找到对应版本的安装文件,如1.11.0,尝试安装:

1
2
3
4
5
6
$ brew install https://github.com/Homebrew/homebrew-core/blob/fdf9f29591b9463ed62199d9a9f9865951a52cff/Formula/apache-flink.rb
Updating Homebrew...
^C
Error: Calling Non-checksummed download of apache-flink formula file from an arbitrary URL is disabled! Use 'brew extract' or 'brew create' and 'brew tap-new' to create a formula file in a tap on GitHub instead.
If reporting this issue please do so at (not Homebrew/brew or Homebrew/core):
https://github.com/Homebrew/homebrew-core/issues/new

还是失败,那将文件下载到本地,然后再安装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ brew install apache-flink.rb
Updating Homebrew...
^CWarning: You are using macOS 10.13.
We (and Apple) do not provide support for this old version.
You will encounter build failures with some formulae.
Please create pull requests instead of asking for help on Homebrew's GitHub,
Twitter or any other official channels. You are responsible for resolving
any issues you experience while you are running this
old version.

==> Downloading https://www.apache.org/dyn/closer.lua?path=flink/flink-1.11.0/flink-1.11.0-bin-scala_2.12.tgz
==> Downloading from https://mirror.koddos.net/apache/flink/flink-1.11.0/flink-1.11.0-bin-scala_2.12.tgz

curl: (22) The requested URL returned error: 404 Not Found
Trying a mirror...
==> Downloading https://archive.apache.org/dist/flink/flink-1.11.0/flink-1.11.0-bin-scala_2.12.tgz
######################################################################## 100.0%
🍺 /usr/local/Cellar/apache-flink/1.11.0: 165 files, 333.5MB, built in 6 seconds
==> `brew cleanup` has not been run in 30 days, running now...
Removing: /Users/yier/Library/Logs/Homebrew/zookeeper... (64B)
Removing: /Users/yier/Library/Logs/Homebrew/nginx... (64B)
Removing: /Users/yier/Library/Logs/Homebrew/kafka... (64B)
Pruned 1 symbolic links and 1 directories from /usr/local

安装成功,查看版本信息:

1
2
$ flink --version
Version: 1.11.0, Commit ID: d04872d

安装目录可以通过命令 brew info apache-flink 获得:

1
2
3
4
5
6
$ brew info apache-flink
apache-flink: stable 1.7.1, HEAD
Scalable batch and stream data processing
https://flink.apache.org/
/usr/local/Cellar/apache-flink/1.11.0 (170 files, 333.7MB) * // 这里
...

启动

1
2
$ cd /usr/local/Cellar/apache-flink/1.11.0/
$ ./libexec/bin/start-cluster.sh

访问地址:http://localhost:8081/

flink-console

测试

新建Maven项目,加入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<properties>
<jdk.version>1.8</jdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.2</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
<showWarnings>true</showWarnings>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>


<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<!-- 打包jar指定启动类 -->
<mainClass>org.example.flink.WindowWordCount</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>

复制官方示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.example.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class WindowWordCount {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);

dataStream.print();

env.execute("Window WordCount");
}

public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

执行打包命令:

1
mvn clean package -Dmaven.test.skip=true

监听本地端口:

1
$ nc -lk 9999

执行:

1
$ ./libexec/bin/flink run   ${yourPath}/flink-example-1.0-SNAPSHOT.jar

同时查看日志:

1
$ tail -100f libexec/log/flink-${yourName}-taskexecutor-0-${yourName}.out

在nc中输入任意单词,可以在日志中看到单词的统计数据(出现次数):

Just type some words hitting return for a new word. These will be the input to the word count program. If you want to see counts greater than 1, type the same word again and again within 5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺).

1
2
3
4
5
6
7
8
9
10
# 在nc中输入
BBQ BBQ AVC
ACV
aC

# 日志中输出
(BBQ,2)
(ACV,1)
(AVC,1)
(aC,1)

关闭

1
$ ./libexec/bin/stop-cluster.sh

参考

Mac中使用brew安装指定版本软件包
官方文档