当前位置: 首页>后端>正文

flink rest api提交一个jar包的接口 flink java api

Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,例如选择,过滤和连接。无论输入是批输入还是流输入,任一接口中指定的查询都具有相同的语义并产生相同的结果。

flink版本:1.8.0

scala版本:2.11.8

1、使用maven引入相关依赖

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <scala.version>2.11.8</scala.version>
    <flink.version>1.8.0</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
	</dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Table API所需依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- JAVA -->
    <!--<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>-->
    <!-- Scala -->
    <!-- 应先导入:flink-streaming-scala_2.11 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2、表API和SQL程序的结构

批处理和流式传输的所有Table API和SQL程序都遵循相同的模式。以下代码示例显示了Table API和SQL程序的常见结构

//如果是批处理程序,使用ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

//创建一个TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)

val table = tableEnv.fromDataStream(env.readTextFile("/data.txt"))
//注册Table
tableEnv.registerTable("table1",...)// or
tableEnv.registerTableSource("table2", ...)     // or
tableEnv.registerExternalCatalog("extCat", ...)
// register an output Table
tableEnv.registerTableSink("outputTable", ...);

// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
env.execute()

具体代码示例

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment, _}
import org.apache.flink.table.catalog.InMemoryExternalCatalog
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.table.sources.CsvTableSource

object TableRegist_1 {
  val csvPath = "E:/workspace/flink-practice/src/main/resources/"

  def main(args: Array[String]): Unit = {
    //流式数据
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val stableEnv = StreamTableEnvironment.create(senv)
    val dataStream = senv.fromElements(
      ("张三", 20), ("李四", 22),
      ("王五", 24), ("赵六", 18)
    )
    //注册表的方式:
    //1、通过DataStream
    stableEnv.registerTable("s_people", stableEnv.fromDataStream(dataStream,"name","age"))
    //2、通过查询结果
    val resTable = stableEnv.scan("s_people").select("_1,_2")
    stableEnv.registerTable("resTable", resTable)

    //注册tableSource
    //TableSource的已有实现类:CsvTableSource、Kafka09TableSource
    stableEnv.registerTableSource("s_source",
      CsvTableSource.builder()
        .path(csvPath + "tablesource.csv")
        .field("name", Types.STRING)
        .field("age", Types.INT)
        .build())

    //注册TableSink
    //TableSink的已有实现类:CsvTableSink、Kafka09TableSink
    stableEnv.registerTableSink("s_sink",
      new CsvTableSink(csvPath + "tablesink.csv", ",", 2, WriteMode.OVERWRITE)
        .configure(Array("name", "age")
          , Array(Types.STRING, Types.INT)))

    //注册外部目录
    //外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息
    //已有实现类为:InMemoryExternalCatalog
    val catalogCatalog = new InMemoryExternalCatalog("catalog")
    stableEnv.registerExternalCatalog("s_catalog", catalogCatalog)

    /** *********************************************************************************/
    //离线批数据类似流式数据
    val env = ExecutionEnvironment.getExecutionEnvironment
    val btableEnv = BatchTableEnvironment.create(env)
    val dataSet = env.fromElements(
      ("张三", 20), ("李四", 22),
      ("王五", 24), ("赵六", 18)
    )
    btableEnv.registerTable("b_people", btableEnv.fromDataSet(dataSet))
  }
}

2.1、将DataStream或DataSet与表之间的注册与转换

  • 将DataStream或DataSet注册为表
// get TableEnvironment 
// registration of a DataSet is equivalent
val tableEnv = StreamTableEnvironment.create(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
  • 将DataStream或DataSet转换为表
val env = StreamExecutionEnvironment.getExecutionEnvironment

// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = StreamTableEnvironment.create(env)

val stream: DataStream[Long] = env.fromCollection(Seq(1L))

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
  • 将表转换为DataStream或DataSet
// get TableEnvironment. 
// registration of a DataSet is equivalent
val tableEnv = StreamTableEnvironment.create(env)

// Table with two fields (String name, Integer age)
val table: Table = ...

// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream[(Boolean, X)]. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

2.2 创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念。它负责:
    1. Table在内部目录中注册表
    2. 注册外部目录
    3. 执行SQL查询
    4. 注册用户定义的(标量,表或聚合)函数
    5. 转换a DataStream或DataSet转换为aTable
    6. 持有对ExecutionEnvironment或的引用StreamExecutionEnvironment

如果是批处理,调用BatchTableEnvironment.create()方法创建TableEnvironment,如果是流处理,怎调用StreamTableEnvironment.create()方法。

// ***************
// 流式数据查询
// ***************
import org.apache.flink.table.api.scala.StreamTableEnvironment

val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
val sTableEnv = StreamTableEnvironment.create(sEnv)

// ***********
// 批数据处理查询
// ***********
import org.apache.flink.table.api.scala.BatchTableEnvironment

val bEnv = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv = BatchTableEnvironment.create(bEnv)

2.3 在目录中注册表

TableEnvironment维护按名称注册的表的目录。有两种类型的表,输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可用于将Table API或SQL查询的结果发送到外部系统。

注册输入表的方法:
    1. 现有Table对象,通常是Table API或SQL查询的结果。
    2. TableSource,访问外部数据,例如文件,数据库或消息传递系统。
    3. DataStream或DataSet来自DataStream或DataSet程序

2.4 注册表

// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Table is the result of a simple projection query 
Table projTable = tableEnv.scan("X").select(...);

// register the Table projTable as table "projectedX"
tableEnv.registerTable("projectedTable", projTable);

2.5 注册TableSource

TableSource提供对外部数据的访问,存储在存储系统中

// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)

// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)

2.6 注册TableSink

已注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统

// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

2.7 注册外部目录

外部目录可以提供有关外部数据库和表的信息,例如其名称,架构,统计信息以及有关如何访问存储在外部数据库,表或文件中的数据的信息。

// get a TableEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

 


https://www.xamrdz.com/backend/3sd1921998.html

相关文章: