Skip to content

5. Turn Process

Fede Fernández edited this page May 29, 2023 · 14 revisions

TURN SEQUENCE

TTTServer/GameStream process turns

Let's complete the last piece of the server implementation: turns processing. At this point, our server can process login and join game events. It also adds the functionality through kafka-streams to transform player events in the input topic into new game events.

Now, we will see how to use one functionality in kafka-streams to join multiple streams and split the messages into different branches based on conditions.

Exercise 22:

⚠️ Before tackling this exercise: Check out this commit 372d06ada59682cec87f42f5ea987f9840196fde

We have provided the kafka-streams functionality in this exercise's commit, so let's review it:

def turnStream(): Unit =
  val VALID_TURN   = "Valid-Turn"
  val INVALID_TURN = "Invalid-Turn"
  val NAME_BRANCH  = "Turn-Branch-"
  val matchTable   = builder.table[GameId, Game](kfg.topics.gameTopic)
  val branches     = builder
    .stream[EventId, TTTEvent](kfg.topics.inputTopic)
    .flatMap[GameId, AggMessage[EventId, TurnGame]] {
      case (eventId, TTTEvent(_, TurnGame(gameId, playerId, position, piece))) =>
        Some((gameId, AggMessage[EventId, TurnGame](eventId, TurnGame(gameId, playerId, position, piece))))
      case _                                                                   => None
    }
    .leftJoin(matchTable)(TurnLogic.processTurn)
    .split(Named.as(NAME_BRANCH))
    .branch((_, eitherMatch) => eitherMatch.isRight, Branched.as(VALID_TURN))
    .branch((_, eitherMatch) => eitherMatch.isLeft, Branched.as(INVALID_TURN))
    .noDefaultBranch()

  branches(s"$NAME_BRANCH$VALID_TURN").flatMapValues(_.toOption).to(kfg.topics.gameTopic)
  branches(s"$NAME_BRANCH$INVALID_TURN")
    .flatMapValues(_.swap.toOption)
    .map((_, re) => (EventId.unsafe(), TTTEvent(Instant.now(), re)))
    .to(kfg.topics.inputTopic)

This code performs the following logic:

  • Define a Kafka table using the gameTopic
  • Read TurnGame events from the inputTopic and transform them into AggMessage
  • Join this stream with the game topic table
  • Apply a logic defined in TurnLogic.processTurn. This method returns an Either validating the turn
  • Valid turns are sent to the game topic, for further processing
  • Invalid turns are sent to the input topic as a RejectEvents

Now, open TurnLogic (modules/server/src/main/scala/scaladays/kafka/topology/TurnLogic.scala) and implement the processTurn method. This needs to read the game state and the event and generate a new game state (or an error if it is not processable).

Since the client will start sending turn events, your second task is to update TTTServer (modules/server/src/main/scala/scaladays/server/TTTServer.scala) to process turn events.

Solution commit https://github.com/xebia-functional/tictactoe/commit/bcf4e72f6f3a05e57f33fcc987ee969df1786d90

Client send new movements

In previous exercises, we prepared the WebSocket to listening to the Game events, and that way, the app reacts accordingly. But it's shown in the next diagram, we are not propagating the Msg.RequestNewMovement to the server via the WebSocket.

notitle

Exercise 23:

Part 1: Publish new movements through the WebSocket

In the file ScalaDaysClient.scala, implement the method:

def publishWs(playerId: PlayerId, gameId: GameId, movement: Movement, ws: WebSocket[F]): Cmd[F, Msg]

taking into account the following tips:

Tip 1: WS.publish

Tyrian.WebSocket implements the function def publish[Msg](message: String): Cmd[F, Msg]

Tip 2: ClientAction

The ClientAction that the server expects is Turn.

Tip 3: Turn encoder

turn.asJson.noSpacesSortKeys encodes a Turn into a String

Part 2: React to the event Msg.RequestNewMovement

In the file Update.scala, please react to the event Msg.RequestNewMovement(game, newMovement).

Tip 1: New Model

The Model evolves with a new Game, which has the state GameState.Processing and the list of movements includes the newMovement

Tip 2: New Cmd

We have to publish the new movement via WebSocket by calling scalaDaysClient.publishWs(playerId, game.gameId, newMovement, ws)

Solution commit https://github.com/xebia-functional/tictactoe/commit/a6ecc342e54b8bd6248956ecd2be7a47242768bf

Client implements restart game

So we can create new WebSocket connections, we can subscribe to new events and we can publish messages. However we didn't put a solution to disconnect the WS when game is over or when we want to play another game.

Exercise 24:

Part 1: Disconnect the WebSocket

In the file ScalaDaysClient.scala, implement the method:

def disconnectWebSocket(ws: WebSocket[F]): Cmd[F, Msg]

taking into account the following tips:

Tip 1: WS.disconnect

Tyrian.WebSocket implements the function def disconnect[Msg]: Cmd[F, Msg]

Part 2: React to the event Msg.Restart

When we implemented the GameView, we included a button to restart game when it's over:

button(tpe := "button", cls := "btn btn-primary btn-lg px-4 gap-3", onClick(Msg.Restart))("Restart")

image

In the file Update.scala, please react to the event Msg.Restart.

(model.copy(contest = Contest.Empty, ws = None, errors = Nil), model.ws.fold(Cmd.None)(scalaDaysClient.disconnectWebSocket))

Tip 1: New Model

contest should adopt the Contest.Empty value, the ws should be None, the errors should be Nil`

Tip 2: New Cmd

If the Model.ws is defined then we have to disconnect it by ScalaDaysClient.disconnectWebSocket, otherwise Cmd.None.

Solution commit https://github.com/xebia-functional/tictactoe/commit/f615cb3b79272dd9d6b41ce252d31c01c9c3ff72
Clone this wiki locally