[flink 实时流基础] flink 源算子

学习笔记

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png


在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-31 23:30:06       5 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-31 23:30:06       5 阅读
  3. 在Django里面运行非项目文件

    2024-03-31 23:30:06       4 阅读
  4. Python语言-面向对象

    2024-03-31 23:30:06       6 阅读

热门阅读

  1. C基础知识笔记一

    2024-03-31 23:30:06       30 阅读
  2. Python 基础教程:面向对象

    2024-03-31 23:30:06       26 阅读
  3. 关于 UnityEditorWindow

    2024-03-31 23:30:06       25 阅读
  4. 「PHP系列」PHP变量

    2024-03-31 23:30:06       57 阅读
  5. 计算机世界的“十六进制”为什么如此重要

    2024-03-31 23:30:06       24 阅读
  6. 蓝桥杯2014年第十三届省赛真题-切面条

    2024-03-31 23:30:06       24 阅读
  7. 【1单片机入门记录】DS18B20的应用

    2024-03-31 23:30:06       48 阅读
  8. C++中的类型转换

    2024-03-31 23:30:06       26 阅读
  9. C语言刷题(21)

    2024-03-31 23:30:06       20 阅读
  10. 算法刷题day37

    2024-03-31 23:30:06       25 阅读