Skip to content

5. Turn Process

Rafa Paradela edited this page Jun 4, 2023 · 14 revisions

TURN SEQUENCE

TTTServer/GameStream process turns

Let's complete the last piece of the server implementation: turns processing. At this point, our server can process login and join game events. It also adds the functionality through kafka-streams to transform player events in the input topic into new game events.

Now, we will see how to use one functionality in kafka-streams to join multiple streams and split the messages into different branches based on conditions.

Exercise 22:

⚠️ Before tackling this exercise: Check out this commit 5176f1ba8272445b735b2c924953ca4867824a77

We have provided the kafka-streams functionality in this exercise's commit, so let's review it:

def turnStream(): Unit =
  val VALID_TURN   = "Valid-Turn"
  val INVALID_TURN = "Invalid-Turn"
  val NAME_BRANCH  = "Turn-Branch-"
  val matchTable   = builder.table[GameId, Game](kfg.topics.gameTopic)
  val branches     = builder
    .stream[EventId, TTTEvent](kfg.topics.inputTopic)
    .flatMap[GameId, AggMessage[EventId, TurnGame]] {
      case (eventId, TTTEvent(_, TurnGame(gameId, playerId, position, piece))) =>
        Some((gameId, AggMessage[EventId, TurnGame](eventId, TurnGame(gameId, playerId, position, piece))))
      case _                                                                   => None
    }
    .leftJoin(matchTable)(TurnLogic.processTurn)
    .split(Named.as(NAME_BRANCH))
    .branch((_, eitherMatch) => eitherMatch.isRight, Branched.as(VALID_TURN))
    .branch((_, eitherMatch) => eitherMatch.isLeft, Branched.as(INVALID_TURN))
    .noDefaultBranch()

  branches(s"$NAME_BRANCH$VALID_TURN").flatMapValues(_.toOption).to(kfg.topics.gameTopic)
  branches(s"$NAME_BRANCH$INVALID_TURN")
    .flatMapValues(_.swap.toOption)
    .map((_, re) => (EventId.unsafe(), TTTEvent(Instant.now(), re)))
    .to(kfg.topics.inputTopic)

This code performs the following logic:

  • Define a Kafka table using the gameTopic
  • Read TurnGame events from the inputTopic and transform them into AggMessage
  • Join this stream with the game topic table
  • Apply a logic defined in TurnLogic.processTurn. This method returns an Either validating the turn
  • Valid turns are sent to the game topic, for further processing
  • Invalid turns are sent to the input topic as a RejectEvents

Now, open TurnLogic (modules/server/src/main/scala/scaladays/kafka/topology/TurnLogic.scala) and implement the processTurn method. This needs to read the game state and the event and generate a new game state (or an error if it is not processable).

Since the client will start sending turn events, your second task is to update TTTServer (modules/server/src/main/scala/scaladays/server/TTTServer.scala) to process turn events.

Solution commit

0217b2952d02636ca38bb1710e27e194e03a2dca

Client send new movements

In previous exercises, we prepared the WebSocket to listening to the Game events, and that way, the app reacts accordingly. But it's shown in the next diagram, we are not propagating the Msg.RequestNewMovement to the server via the WebSocket.

notitle

Exercise 23:

Part 1: Publish new movements through the WebSocket

In the file ScalaDaysClient.scala, implement the method:

def publishWs(playerId: PlayerId, gameId: GameId, movement: Movement, ws: WebSocket[F]): Cmd[F, Msg]

taking into account the following tips:

Tip 1: WS.publish

Tyrian.WebSocket implements the function def publish[Msg](message: String): Cmd[F, Msg]

Tip 2: ClientAction

The ClientAction that the server expects is Turn.

Tip 3: Turn encoder

turn.asJson.noSpacesSortKeys encodes a Turn into a String

Part 2: React to the event Msg.RequestNewMovement

In the file Update.scala, please react to the event Msg.RequestNewMovement(game, newMovement).

Tip 1: New Model

The Model evolves with a new Game, which has the state GameState.Processing and the list of movements includes the newMovement

Tip 2: New Cmd

We have to publish the new movement via WebSocket by calling scalaDaysClient.publishWs(playerId, game.gameId, newMovement, ws)

Solution commit

f6c7ecfe42f1ec4b25897f4dc0888f7b8c510cdd

Client implements restart game

So we can create new WebSocket connections, we can subscribe to new events and we can publish messages. However we didn't put a solution to disconnect the WS when game is over or when we want to play another game.

Exercise 24:

Part 1: Disconnect the WebSocket

In the file ScalaDaysClient.scala, implement the method:

def disconnectWebSocket(ws: WebSocket[F]): Cmd[F, Msg]

taking into account the following tips:

Tip 1: WS.disconnect

Tyrian.WebSocket implements the function def disconnect[Msg]: Cmd[F, Msg]

Part 2: React to the event Msg.Restart

When we implemented the GameView, we included a button to restart game when it's over:

button(tpe := "button", cls := "btn btn-primary btn-lg px-4 gap-3", onClick(Msg.Restart))("Restart")

image

In the file Update.scala, please react to the event Msg.Restart.

(model.copy(contest = Contest.Empty, ws = None, errors = Nil), model.ws.fold(Cmd.None)(scalaDaysClient.disconnectWebSocket))

Tip 1: New Model

contest should adopt the Contest.Empty value, the ws should be None, the errors should be Nil`

Tip 2: New Cmd

If the Model.ws is defined then we have to disconnect it by ScalaDaysClient.disconnectWebSocket, otherwise Cmd.None.

Solution commit

251d183b271765dad01cd88cef70f8a3ad4485dc

Clone this wiki locally