TOP

Akka(22): Stream:實時操控:動態管道連接-MergeHub,BroadcastHub and PartitionHub(一)
2017-10-09 13:27:34 】 瀏覽:9872
Tags:

  在現實中我們會經常遇到這樣的場景:有一個固定的數據源Source,我們希望按照程序運行狀態來接駁任意數量的下游接收方subscriber、又或者我需要在程序運行時(runtime)把多個數據流向某個固定的數據流終端Sink推送。這就涉及到動態連接合并型Merge或擴散型Broadcast的數據流連接點junction。從akka-stream的技術文檔得知:一對多,多對一或多對多類型的復雜數據流組件必須用GraphDSL來設計,產生Graph類型結果。前面我們提到過:Graph就是一種運算預案,要求所有的運算環節都必須是預先明確指定的优乐棋牌app下载优乐棋牌app下载,如此應該是無法實現動態的管道連接的。但akka-stream提供了MergeHub,BroadcastHub和PartitionHub來支持這樣的功能需求。

1、MergeHub:多對一合并類型。支持動態的多個上游publisher連接

2、BroadcastHub:一對多擴散類型。支持動態的多個下游subscriber連接

3、PartitionHub:實際上是一對多擴散類型。通過一個函數來選擇數據派送目的地

MergeHub對象中有個source函數:

  def source[T](perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = Source.fromGraph(new MergeHub[T](perProducerBufferSize))

MergeHub.source函數的返回結果類型是Source[T,Sink[T,NotUsed]],本質上MergeHub就是一個共用的Sink优乐棋牌app下载,如下所示:

  val fixedSink = Sink.foreach(println) val sinkGraph: RunnableGraph[Sink[Any,NotUsed]] = MergeHub.source(perProducerBufferSize = 16).to(fixedSink) val inGate: Sink[Any,NotUsed] = sinkGraph.run()   //common input //now connect any number of source
  val (killSwitch,_) = (Source(Stream.from(0)).delay(1.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch2,_) = (Source(List("a","b","c","d","e")).delay(2.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() val (killSwitch3,_) = (Source(List("AA","BB","CC","DD","EE")).delay(3.second,DelayOverflowStrategy.backpressure) .viaMat(KillSwitches.single)(Keep.right).toMat(inGate)(Keep.both)).run() scala.io.StdIn.readLine() killSwitch.shutdown() killSwitch2.shutdown() killSwitch3.shutdown() actorSys.terminate()

同樣,BroadcastHub就是一種共用的Source,可以連接任何數量的下游subscriber。下面是BroadcastHub.sink的定義:

  /** * Creates a [[Sink]] that receives elements from its upstream producer and broadcasts them to a dynamic set * of consumers. After the [[Sink]] returned by this method is materialized, it returns a [[Source]] as materialized * value. This [[Source]] can be materialized an arbitrary number of times and each materialization will receive the * broadcast elements from the original [[Sink]]. * * Every new materialization of the [[Sink]] results in a new, independent hub, which materializes to its own * [[Source]] for consuming the [[Sink]] of that materialization. * * If the original [[Sink]] is fai  
		

請關注公眾號獲取更多資料



首頁 上一頁 1 2 3 4 5 下一頁 尾頁 1/5/5
】【打印繁體】【】【】 【】【】【】 【關閉】 【返回頂部
上一篇Akka(21): Stream:實時操控:.. 下一篇scala基本語法和單詞統計