Skip to content

Commit

Permalink
Replacing Akka IO with Akka Streams #5
Browse files Browse the repository at this point in the history
* New flow to encode Frames into ByteStrings
  • Loading branch information
fcabestre committed Jun 11, 2016
1 parent b9a13c0 commit 08a5483
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,13 @@ class BytesToFrames extends GraphStage[FlowShape[ByteString, Frame]] {

def onError(error: Err) = fail(out, new Exception(error.messageWithContext))

@throws[Exception](classOf[Exception])
override def onPush(): Unit = {
val bits = remainingBytes ++ BitVector(grab(in).asByteBuffer)
decode(bits).fold(onError, onSuccess)
}

@throws[Exception](classOf[Exception])
override def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)

setHandler(out, this)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package net.sigusr.mqtt.impl.stages

import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.util.ByteString
import net.sigusr.mqtt.impl.frames.Frame
import scodec.Codec

class FramesToBytes extends GraphStage[FlowShape[Frame, ByteString]] {

val in = Inlet[Frame]("FramesToBytes.in")
val out = Outlet[ByteString]("FramesToBytes.out")

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {

@throws[Exception](classOf[Exception])
override def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)

@throws[Exception](classOf[Exception])
override def onPush(): Unit = {
val result = Codec[Frame].encode(grab(in)).require
if (result.nonEmpty)
push(out, ByteString(result.toByteArray))
tryPull(in)
}

setHandler(out, this)
setHandler(in, this)
}

override def shape: FlowShape[Frame, ByteString] = FlowShape(in, out)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package net.sigusr.mqtt.impl.stages

import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.{Source, Keep}
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
import akka.util.ByteString
import net.sigusr.mqtt.SpecsTestKit
Expand All @@ -26,7 +26,36 @@ import org.specs2.mutable._
import scodec.Codec
import scodec.bits.ByteVector

object FlowsSpecs extends Specification {
object FlowsSpec extends Specification {

"A frames to bytes flow" should {
"Provide a byte stream from a correct frame" in new SpecsTestKit {
val header = Header(dup = false, AtMostOnce.enum, retain = false)
val connackFrame = ConnackFrame(header, 0)
val encodedConnackFrame = ByteString(Codec[Frame].encode(connackFrame).require.toByteArray)

val flow = new FramesToBytes
val (pub, sub) = TestSource.probe[Frame].via(flow).toMat(TestSink.probe[ByteString])(Keep.both).run()

sub.request(1)
pub.sendNext(connackFrame)
val result = sub.expectNext()
result should_=== encodedConnackFrame
}

"Provide a byte stream (pull) from a correct frame" in new SpecsTestKit {
val header = Header(dup = false, AtMostOnce.enum, retain = false)
val connackFrame = ConnackFrame(header, 0)
val encodedConnackFrame = ByteString(Codec[Frame].encode(connackFrame).require.toByteArray)

val flow = new FramesToBytes
val (pub, sub) = Source.single(connackFrame).via(flow).toMat(TestSink.probe[ByteString])(Keep.both).run()

sub.request(1)
val result = sub.expectNext()
result should_=== encodedConnackFrame
}
}

"A bytes to frames flow" should {

Expand All @@ -44,6 +73,19 @@ object FlowsSpecs extends Specification {
result should_=== connackFrame
}

"Provide a frame (pull) from a correct byte stream" in new SpecsTestKit {
val header = Header(dup = false, AtMostOnce.enum, retain = false)
val connackFrame = ConnackFrame(header, 0)
val encodedConnackFrame = ByteString(Codec[Frame].encode(connackFrame).require.toByteArray)

val flow = new BytesToFrames
val (pub, sub) = Source.single(encodedConnackFrame).via(flow).toMat(TestSink.probe[Frame])(Keep.both).run()

sub.request(1)
val result = sub.expectNext()
result should_=== connackFrame
}

"Provide a frame from a (big) correct byte stream" in new SpecsTestKit {
import net.sigusr.mqtt.SpecUtils._

Expand Down Expand Up @@ -77,7 +119,7 @@ object FlowsSpecs extends Specification {
sub.expectNextN(List(frame0x01, frame0x7f))
}

"Fail from a failing byte stream" in new SpecsTestKit {
"Fail from a byte stream error decoding" in new SpecsTestKit {
val garbageFrame = ByteString(0xff)

val flow = new BytesToFrames
Expand All @@ -86,6 +128,25 @@ object FlowsSpecs extends Specification {
sub.request(1)
pub.sendNext(garbageFrame)
val result = sub.expectError()
result.getMessage should_=== "Unknown discriminator 15"
}

"Fail from a failing byte stream" in new SpecsTestKit {
val flow = new BytesToFrames
val (pub, sub) = TestSource.probe[ByteString].via(flow).toMat(TestSink.probe[Frame])(Keep.both).run()

sub.request(1)
pub.sendComplete()
val result = sub.expectComplete()
}
"Be completed from a completed byte stream" in new SpecsTestKit {
val flow = new BytesToFrames
val (pub, sub) = TestSource.probe[ByteString].via(flow).toMat(TestSink.probe[Frame])(Keep.both).run()

sub.request(1)
pub.sendError(new Exception("Unusualsituation ;)"))
val result = sub.expectError()
result.getMessage should_=== "Unusualsituation ;)"
}
}
}

0 comments on commit 08a5483

Please sign in to comment.