TOP

flink實戰--讀寫Hbase
2019-03-05 01:41:41 】 瀏覽:1800
Tags:

版權聲明:原創文章 歡迎參考 請勿抄襲 https://blog.csdn.net/aA518189/article/details/86544844

簡介

在Flink文檔中,提供connector讀取源數據和把處理結果存儲到外部系統中。但是沒有提供數據庫的connector,如果要讀寫數據庫,官網給出了異步IO(Asynchronous I/O)專門用于訪問外部數據优乐棋牌app下载,詳細可看:

還有一種方法是繼承RichSourceFunction,重寫里面的方法,所有的數據庫flink都可以通過這兩種方式進行數據的讀寫,這里以hbase為例進行說明。

flink讀寫Hbase

寫入HBase提供兩種方式

  1. 第一種:繼承RichSourceFunction重寫父類方法
  2. 第二種:實現OutputFormat接口

本文主要介紹我們實現OutputFormat接口的具體步驟

實現OutputFormat接口

實現方式:

我們需要自己自定義一個hbase的操作類實現OutputFormat接口,重寫里面的抽象方法,也就是下面的抽象方法

public interface OutputFormat<IT> extends Serializable {
	void configure(Configuration parameters);
	void open(int taskNumber, int numTasks) throws IOException;
	void writeRecord(IT record) throws IOException;
	void close() throws IOException;
}

抽象方法說明

configure

configure方法主要用于:配置輸出格式。由于輸出格式是通用的,因此是無參數的优乐棋牌app下载,這個方法是輸出格式根據配置值設置基本字段的地方,此方法總是在實例化輸出格式上首先調用,但是我們不會這個方法做實際測操作。

open

用于打開輸出格式的并行實例,以存儲其并行實例的結果,調用此方法時,將確保配置該方法的輸出格式。所以在open方法中我們會進行hbase的連接,配置,建表等操作。

writeRecord

用于將數據寫入數據源,所以我們會在這個方法中調用寫入hbase的API

close

這個不用說了就是關閉數據源的連接

導入依賴

<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-client</artifactId>
 <version>1.2.4</version>
 </dependency>
<dependency>
 <groupId>org.apache.hbase</groupId>
 <artifactId>hbase-server</artifactId>
 <version>1.2.4</version>
</dependency>

實例

public class HBaseOutputFormat implements OutputFormat<Tuple5<Long, Long, Long, String, Long>> {
    private org.apache.hadoop.conf.Configuration conf = null;
    private Connection conn = null;
    private Table table = null;
    @Override
    public void configure(Configuration parameters) {
    }
    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        HbaseUtil.setConf("ip1,ip2,ip3", "2181");
        conn = HbaseUtil.connection;
        HbaseUtil.createTable("flink_test2","info");
    }
    @Override
    public void writeRecord(Tuple5<Long, Long, Long, String, Long> record) throws IOException {
        Put put = new Put(Bytes.toBytes(record.f0+record.f4));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("uerid"), Bytes.toBytes(record.f0));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("behavior"), Bytes.toBytes(record.f3));
        ArrayList<Put> putList = new ArrayList<>();
        putList.add(put);