亚洲成a人片在线观看69,中文字幕免费无线观看,日韩精品无码一区二区三区四区,92精品国产自产在线观看直播,亚洲精品无码不卡在线观看屁,亚洲成av人片在www色猫咪

Flink 寫(xiě)入數據到 Kafka

前言

通過(guò)Flink官網(wǎng)可以看到Flink里面就默認支持了不少sink,比如也支持Kafka sink connector(FlinkKafkaProducer),那么這篇文章我們就來(lái)看看如何將數據寫(xiě)入到Kafka。

準備

Flink里面支持Kafka 0.8、0.9、0.10、0.11.

這里我們需要安裝下Kafka,請對應添加對應的Flink Kafka connector依賴(lài)的版本,這里我們使用的是0.11 版本:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
</dependency>

目前我們先看下本地Kafka是否有這個(gè)student-write topic呢?需要執行下這個(gè)命令:

./kafka-topics.sh --list --zookeeper localhost:2181

執行結果:

Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=on -Dswing.aatext=true  
__consumer_offsets  
metric  
student

如果等下我們的程序運行起來(lái)后,再次執行這個(gè)命令出現student-write topic,那么證明我的程序確實(shí)起作用了,已經(jīng)將其他集群的Kafka數據寫(xiě)入到本地Kafka了。

代碼

package com.thinker.kafka;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.*;

import java.util.Properties;

/**
 * @author zeekling [lingzhaohui@zeekling.cn]
 * @version 1.0
 * @apiNote
 * @since 2020-05-14
 */
public class FlinkSinkToKafka {

    private static final String READ_TOPIC = "student-write";

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "student-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");
        DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer011<>(
                READ_TOPIC,   //這個(gè) kafka topic 需要和上面的工具類(lèi)的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1);
        student.print();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "student-write");

        student.addSink(new FlinkKafkaProducer011<String>(
                "student-write",
                new SimpleStringSchema(),
                properties
        )).name("flink-connectors-kafka").setParallelism(1);
        student.print();
        env.execute("flink learning connectors kafka");
    }

}

運行程序

將下面列舉出來(lái)的包拷貝到flink對應的目錄下面,并且重啟flink。

5c11a9d680d1c93a2bdb91446927f1b1.png

執行下面命令提交flink任務(wù)

./bin/flink run -c com.thinker.kafka.FlinkSinkToKafka ~/project/flink-test/target/flink-test-1.0-SNAPSHOT.jar

提交成功后執行下面命令:

/kafka-topics.sh --list --zookeeper localhost:2181

執行結果:
e423b642a34d6040bc085920cce13ff9.png

# 轉載  flink 


標 題:《Flink 寫(xiě)入數據到 Kafka
作 者:zeekling
提 示:轉載請注明文章轉載自個(gè)人博客:浪浪山旁那個(gè)村

評論

取消
亚洲成a人片在线观看69,中文字幕免费无线观看,日韩精品无码一区二区三区四区,92精品国产自产在线观看直播,亚洲精品无码不卡在线观看屁,亚洲成av人片在www色猫咪 亚洲高清一区二区三区不卡| 在线播放久久大蕉香蕉免费| 久久久久国产一区二区三区| 亚洲制服丝袜一区二区三区| 亚洲图揄拍自拍色综合| 中文字幕一永久免费观看电视剧| 欧美人与人动人物2020| 日韩欧美一区二区三区免费观看| 亚洲制服丝袜另类经典| 亚洲欧美v国产蜜芽tv| 伊人久久大香线蕉影院| 少妇人妻AV一区二区| 亚洲精品网站在线观看| 日韩久久久精品影院| 91精品国产免费久久久久久| 亚洲视频免费播放| 国产女人级18视频| 午夜无码专区性视频性视频| 一区二区三区欧美日韩不卡| 亚洲av无码成人精品区一区| 亚洲成AV人在线视午夜片| 蜜桃久久久aaaa成人网一区| 久久久在线视频精品免费| 综合精品欧美三级| 日本午夜免a费看大片中文4| 亚洲欧美精品SUV| 亚洲韩国日本AA片片| 亚洲制服丝袜一区二| 亚洲精品中文字幕无乱码| 日本一区视频在线播放| 三级AV无码中文字幕滚动| 亚洲日韩乱码一区二区三区四区| 中文字幕在线一区精品| 亚洲精品人成无码中文毛片| 亚洲一级簧片大全二级黄区| 曰本黄页在线观看| 一级特黄AAA大片在线观看| 国产全是老熟女太爽了| 永久免费看mv网站| 久久九九亚洲精品美国国内产一级| 韩国伦理片免费观看| http://baichidingdikj.com http://hbysff.com http://jiaoshidasai.com http://hhehy2006.com http://bjzpwhcm.com http://791163.com