TOP

HBaseRDD -- 封裝Spark on HBase操作到Spark RDD
2019-03-06 13:48:55 】 瀏覽:799
Tags:

版權聲明:原創作品,如需轉載,請注明出處。否則將追究法律責任 https://blog.csdn.net/chenjian_grt/article/details/80347273

最近閱讀Spark的源代碼,發現Spark使用隱式轉換,對rdd進行擴展,提供額外的功能,如PairRDDFunctions,對RDD進行擴展,提供諸如orderByKey等方法,前段時間我們使用Spark操作HBase,由于急著上線,未能對功能進行較好的封裝优乐棋牌app下载,現在回過頭去看,發現其實可以模仿PairRDDFunctions的實現,做一個HBaseRDDFuctions,通過隱式轉換,實現對HBase基本操作的封裝。

HBaseRDDFuctions的代碼實現如下:

import java.io.IOException

import java.util

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.hadoop.hbase.KeyValue.Type

import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}

import org.apache.hadoop.hbase.client._

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableInputFormat}

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.util.{Base64, Bytes}

import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import scala.language.implicitConversions

import scala.collection.mutable.ListBuffer

class HBaseRDDFunctions[T](self: RDD[T]) {

private def saveToHBase(sc: SparkContext,

tableName: String,

hbaseRDD: RDD[(ImmutableBytesWritable, KeyValue)],

tmpHFilePath: String

): Unit = {

var conn: Connection = null

var realTable: Table = null

val hbaseConf = HBaseConfiguration.create(sc.hadoopConfiguration)

val hbTableName = TableName.valueOf(tableName)

val job = Job.getInstance(hbaseConf)

val fs = FileSystem.get(sc.hadoopConfiguration)

fs.deleteOnExit(new Path(tmpHFilePath))

try {

conn = ConnectionFactory.createConnection(hbaseConf)

job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])

job.setMapOutputValueClass(classOf[KeyValue])

realTable = conn.getTable(hbTableName)

HFileOutputFormat2.configureIncrementalLoad(job, realTable, conn.getRegionLocator(hbTableName))

hbaseRDD.saveAsNewAPIHadoopFile(tmpHFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)

val loader = new LoadIncrementalHFiles(hbaseConf)

loader.doBulkLoad(new Path(tmpHFilePath), realTable.asInstanceOf[HTable])

} finally {

fs.deleteOnExit(new Path(tmpHFilePath))

if (realTable != null) {

realTable.close()

}

if (conn != null) {

conn.close()

}

}

}

def hbaseBulkDeleteColumn(sc: SparkContext,

tableName: String,

tmpHFilePath: String,

transfer: T => Array[Byte]

): Unit = {

implicit val rowKeyOrding = new Ordering[Array[Byte]] {

override def compare(left: Array[Byte], right: Array[Byte]) = {

Bytes.compareTo(left, right)

}

}

val ts = System.currentTimeMillis()

val hbaseRdd = self.map(rd => {

(transfer(rd), Type.DeleteColumn.getCode)

}).sortByKey().map(rd => {

(new ImmutableBytesWritable(rd._1), new KeyValue(rd._1, ts, Type.DeleteColumn))

})

saveToHBase(sc, tableName, hbaseRdd, tmpHFilePath)

}

def hbaseBulkLoad(sc: SparkContext,

tableName: String,

tmpHFilePath: String,

transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])],

kvType: Type = Type.Put

): Unit = {

implicit val rowKeyOrding = new Ordering[Array[Byte]] {

override def compare(left: Array[Byte], right: Array[Byte]) = {

Bytes.compareTo(left, right)

}

}

val ts = System.currentTimeMillis()

val hbaseRDD = self.flatMap(transfer).map(rd => {

val rowKey = rd._1

val columnFamily = rd._2

val qualifier = rd._3

val value = rd._4

val key = new Array[Byte](rowKey.length + columnFamily.length + qualifier.length)

System.arraycopy(rowKey, 0, key, 0, rowKey.length)

System.arraycopy(columnFamily, 0, key, rowKey.length, columnFamily.length)

System.arraycopy(qualifier, 0, key, rowKey.length + columnFamily.length, qualifier.length)

(key, (rowKey.length, columnFamily.length, value))

}).sortByKey().map(rd => {

val rowKey = new Array[Byte](rd._2._1)

val columnFamily = new Array[Byte](rd._2._2)

val qualifier = new Array[Byte](rd._1.length - rd._2._1 - rd._2._2)

System.arraycopy(rd._1, 0, rowKey, 0, rd._2._1)

System.arraycopy(rd._1, rd._2._1, columnFamily, 0, rd._2._2)

System.arraycopy(rd._1, rd._2._1 + rd._2._2, qualifier, 0, rd._1.length - rd._2._1 - rd._2._2)

val kv = new KeyValue(rowKey, columnFamily, qualifier, ts, kvType, rd._2._3)

(new ImmutableBytesWritable(rowKey), kv)

})

saveToHBase(sc, tableName, hbaseRDD, tmpHFilePath)

}

def hbaseBulkDelete(sc: SparkContext,

tableName: String,

tmpHFilePath: String,

transfer: T => List[(Array[Byte], Array[Byte], Array[Byte], Array[Byte])]

): Unit = {

hbaseBulkLoad(sc, tableName, tmpHFilePath, transfer, Type.DeleteColumn)

}

def hbaseGet(sc: SparkContext,

tableName: String,

transfer: T => Get,

zkQuorum: String,

zkPort: String,

num: Int = 100

): RDD[Result] = {

self.map(transfer).mapPartitions(it => {

val lb = new ListBuffer[Result]()

val batch = new util.ArrayList[Row]

var realTable: Table = null

val hbTableName = TableName.valueOf(tableName)

var conn: Connection = null

val hbConf = HBaseConfiguration.create()

hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

hbConf.set("hbase.zookeeper.quorum", zkQuorum)

try {

conn = ConnectionFactory.createConnection(hbConf)

realTable = conn.getTable(hbTableName)

while (it.hasNext) {

val get = it.next()

get.setCacheBlocks(false)

batch.add(get)

if (batch.size >= num) {

val results = new Array[Object](batch.size)

realTable.batch(batch, results)

results.foreach(x => {

lb += x.asInstanceOf[Result]

})

}

}

if (batch.size() != 0) {

val results = new Array[Object](batch.size)

realTable.batch(batch, results)

results.foreach(x => {

lb += x.asInstanceOf[Result]

})

}

} finally {

if (realTable != null) {

try {

//關閉HTable對象

realTable.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

if (conn != null) {

try {

// 關閉HBase連接

conn.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

}

lb.iterator

})

}

def hbaseDelete(sc: SparkContext,

tableName: String,

transfer: T => Delete,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

}

def hbaseMutation(sc: SparkContext,

tableName: String,

transfer: T => Mutation,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

self.foreachPartition(it => {

var conn: Connection = null

var realTable: Table = null

val hbConf = HBaseConfiguration.create()

hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

hbConf.set("hbase.zookeeper.quorum", zkQuorum)

val hbTableName = TableName.valueOf(tableName)

try {

conn = ConnectionFactory.createConnection(hbConf)

realTable = conn.getTable(hbTableName)

val mutationList = new java.util.ArrayList[Mutation]

while (it.hasNext) {

mutationList.add(transfer(it.next()))

if (mutationList.size >= num) {

realTable.batch(mutationList, null)

mutationList.clear()

}

}

if (mutationList.size > 0) {

realTable.batch(mutationList, null)

mutationList.clear()

}

} finally {

if (realTable != null) {

try {

//關閉HTable對象

realTable.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

if (conn != null) {

try {

// 關閉HBase連接

conn.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

}

})

}

def hbaseScan(sc: SparkContext,

tableName: String,

transfer: T => Scan,

zkQuorum: String,

zkPort: String

): RDD[Result] = {

self.mapPartitions(it => {

val lb = new ListBuffer[Result]()

var conn: Connection = null

var realTable: Table = null

val hbConf = HBaseConfiguration.create()

hbConf.set("hbase.zookeeper.property.clientPort", zkPort)

hbConf.set("hbase.zookeeper.quorum", zkQuorum)

val hbTableName = TableName.valueOf(tableName)

try {

conn = ConnectionFactory.createConnection(hbConf)

realTable = conn.getTable(hbTableName)

var scanner: ResultScanner = null

try {

while (it.hasNext) {

val scan = transfer(it.next())

scan.setCacheBlocks(false)

scanner = realTable.getScanner(scan)

val scannerIt = scanner.iterator

while (scannerIt.hasNext) {

val rs = scannerIt.next()

lb += rs

}

}

} finally {

if (scanner != null) {

scanner.close()

}

}

} finally {

if (realTable != null) {

try {

//關閉HTable對象

realTable.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

if (conn != null) {

try {

// 關閉HBase連接

conn.close()

} catch {

case e: IOException =>

e.printStackTrace();

}

}

}

lb.iterator

})

}

def hbasePut(sc: SparkContext,

tableName: String,

transfer: T => Put,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

}

def hbaseIncrement(sc: SparkContext,

tableName: String,

transfer: T => Increment,

zkQuorum: String,

zkPort: String,

num: Int = 100

): Unit = {

hbaseMutation(sc, tableName, transfer, zkQuorum, zkPort, num)

}

}

object HBaseRDDFunctions {

def fromScan(sc: SparkContext,

tableName: String,

scan: Scan

): RDD[(ImmutableBytesWritable, Result)] = {

scan.setCacheBlocks(false)

val proto = ProtobufUtil.toScan(scan)

val scanToString = Base64.encodeBytes(proto.toByteArray)

val hbConf = HBaseConfiguration.create(sc.hadoopConfiguration)

hbConf.set(TableInputFormat.INPUT_TABLE, tableName)

hbConf.set(TableInputFormat.SCAN, scanToString)

sc.newAPIHadoopRDD(hbConf,

classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result]

)

}

implicit def rddToHBaseRDD[T](rdd: RDD[T]): HBaseRDDFunctions[T] = {

new HBaseRDDFunctions(rdd)

}

}

注:HBase的版本為1.0.2, Spark的版本為2.1.0

使用方法:

1、導入隱式轉換函數:

2、導入后优乐棋牌app下载,spark rdd將得到擴展,出現我們自己開發的Spark on HBase的方法:

請關注公眾號獲取更多資料



】【打印繁體】【】【】 【】【】【】 【關閉】 【返回頂部
上一篇Hbase   snapshot 下一篇hbase的JavaAPI操作:連接、創建..