-
Notifications
You must be signed in to change notification settings - Fork 7
5. Turn Process
Let's complete the last piece of the server implementation: turns processing. At this point, our server can process login and join game events. It also adds the functionality through kafka-streams to transform player events in the input topic into new game events.
Now, we will see how to use one functionality in kafka-streams to join multiple streams and split the messages into different branches based on conditions.
372d06ada59682cec87f42f5ea987f9840196fde
|
---|
We have provided the kafka-streams functionality in this exercise's commit, so let's review it:
def turnStream(): Unit =
val VALID_TURN = "Valid-Turn"
val INVALID_TURN = "Invalid-Turn"
val NAME_BRANCH = "Turn-Branch-"
val matchTable = builder.table[GameId, Game](kfg.topics.gameTopic)
val branches = builder
.stream[EventId, TTTEvent](kfg.topics.inputTopic)
.flatMap[GameId, AggMessage[EventId, TurnGame]] {
case (eventId, TTTEvent(_, TurnGame(gameId, playerId, position, piece))) =>
Some((gameId, AggMessage[EventId, TurnGame](eventId, TurnGame(gameId, playerId, position, piece))))
case _ => None
}
.leftJoin(matchTable)(TurnLogic.processTurn)
.split(Named.as(NAME_BRANCH))
.branch((_, eitherMatch) => eitherMatch.isRight, Branched.as(VALID_TURN))
.branch((_, eitherMatch) => eitherMatch.isLeft, Branched.as(INVALID_TURN))
.noDefaultBranch()
branches(s"$NAME_BRANCH$VALID_TURN").flatMapValues(_.toOption).to(kfg.topics.gameTopic)
branches(s"$NAME_BRANCH$INVALID_TURN")
.flatMapValues(_.swap.toOption)
.map((_, re) => (EventId.unsafe(), TTTEvent(Instant.now(), re)))
.to(kfg.topics.inputTopic)
This code performs the following logic:
- Define a Kafka table using the
gameTopic
- Read
TurnGame
events from theinputTopic
and transform them intoAggMessage
- Join this stream with the game topic table
- Apply a logic defined in
TurnLogic.processTurn
. This method returns anEither
validating the turn - Valid turns are sent to the game topic, for further processing
- Invalid turns are sent to the input topic as a
RejectEvent
s
Now, open TurnLogic
(modules/server/src/main/scala/scaladays/kafka/topology/TurnLogic.scala
) and implement the processTurn
method. This needs to read the game state and the event and generate a new game state (or an error if it is not processable).
Since the client will start sending turn events, your second task is to update TTTServer
(modules/server/src/main/scala/scaladays/server/TTTServer.scala
) to process turn events.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/bcf4e72f6f3a05e57f33fcc987ee969df1786d90
In previous exercises, we prepared the WebSocket to listening to the Game events, and that way, the app reacts accordingly.
But it's shown in the next diagram, we are not propagating the Msg.RequestNewMovement
to the server via the
WebSocket.
In the file ScalaDaysClient.scala
, implement the method:
def publishWs(playerId: PlayerId, gameId: GameId, movement: Movement, ws: WebSocket[F]): Cmd[F, Msg]
taking into account the following tips:
Tip 1: WS.publish
Tyrian.WebSocket
implements the functiondef publish[Msg](message: String): Cmd[F, Msg]
Tip 2:
ClientAction
The
ClientAction
that the server expects isTurn
.
Tip 3:
Turn
encoder
turn.asJson.noSpacesSortKeys
encodes aTurn
into aString
In the file Update.scala
, please react to the event Msg.RequestNewMovement(game, newMovement)
.
Tip 1: New Model
The Model evolves with a new Game, which has the state
GameState.Processing
and the list of movements includes thenewMovement
Tip 2: New Cmd
We have to publish the new movement via WebSocket by calling
scalaDaysClient.publishWs(playerId, game.gameId, newMovement, ws)
Solution commit
https://github.com/xebia-functional/tictactoe/commit/a6ecc342e54b8bd6248956ecd2be7a47242768bf
So we can create new WebSocket connections, we can subscribe to new events and we can publish messages. However we didn't put a solution to disconnect the WS when game is over or when we want to play another game.
In the file ScalaDaysClient.scala
, implement the method:
def disconnectWebSocket(ws: WebSocket[F]): Cmd[F, Msg]
taking into account the following tips:
Tip 1: WS.disconnect
Tyrian.WebSocket
implements the functiondef disconnect[Msg]: Cmd[F, Msg]
When we implemented the GameView
, we included a button to restart game when it's over:
button(tpe := "button", cls := "btn btn-primary btn-lg px-4 gap-3", onClick(Msg.Restart))("Restart")
In the file Update.scala
, please react to the event Msg.Restart
.
(model.copy(contest = Contest.Empty, ws = None, errors = Nil), model.ws.fold(Cmd.None)(scalaDaysClient.disconnectWebSocket))
Tip 1: New Model
contest
should adopt theContest.Empty
value, thews
should beNone, the errors should be
Nil`
Tip 2: New Cmd
If the
Model.ws
is defined then we have to disconnect it byScalaDaysClient.disconnectWebSocket
, otherwiseCmd.None
.
Solution commit
https://github.com/xebia-functional/tictactoe/commit/f615cb3b79272dd9d6b41ce252d31c01c9c3ff72