Skip to content

4. Game Process

Fede Fernández edited this page Sep 6, 2023 · 27 revisions

GAME SEQUENCE

Dummy websocket server

It's time to start building the primary communication protocol between the client and the server. For this workshop, we have decided to use WebSockets.

WebSockets is a communication protocol that provides full-duplex communication channels over a single TCP connection. It enables real-time communication between a client (typically a web browser) and a server, allowing both parties to send messages to each other at any time.

Unlike the traditional HTTP request-response model, where the client sends a request, and the server responds, WebSockets allow for persistent connections. Once the WebSocket connection is established, it remains open, allowing for bi-directional communication. This real-time capability makes WebSockets suitable for various scenarios that require instant data updates or interactive communication.

In our example, WebSockets are an excellent choice since one use case is applications requiring real-time updates. WebSockets allow the server to push updates to the client instantly, eliminating the need for the client to poll the server for new information repeatedly.

In general terms, WebSockets provide a more efficient and responsive alternative to traditional HTTP-based communication when real-time, interactive, or continuous data updates are required.

Exercise 14:

⚠️ Before tackling this exercise: Check out this tag 4-1-websocket-server-exercise-setup 4-1-websocket-server-exercise-setup

In this exercise, we will implement a WebSocket endpoint in our server.

Open the file modules/server/src/main/scala/scaladays/Server.scala. As you can see, we have changed the call to withHttpApp in the EmberServerBuilder object with withHttpWebSocketApp:

def withHttpWebSocketApp(f: WebSocketBuilder[F] => HttpApp[F]): EmberServerBuilder[F]

This method allows us to add HTTP endpoints and also construct WebSocket endpoints with the param WebSocketBuilder. This value is passed now to our WebServer.routes. Let's move to that file (modules/server/src/main/scala/scaladays/server/WebServer.scala).

For this exercise, we have implemented the structure of the endpoint:

case GET -> Root / "player" / PlayerId(playerId) / "join" =>
  val send: fs2.Stream[F, WebSocketFrame] = sendResponse(playerId)
  val receive: fs2.Pipe[F, WebSocketFrame, Unit] =
    in => in.evalMap(processClientMessage(playerId, _))
  ws.build(send, receive)

But as you can see, it's pretty easy. We need to provide three parts:

  • A path definition: case GET -> Root / "player" / PlayerId(playerId) / "join".
  • A send method of type fs2.Stream[F, WebSocketFrame]. It represents the outgoing stream of messages that should be sent to the client.
  • A receive method of type fs2.Pipe[F, WebSocketFrame, Unit]. It is a sink to which the framework will push the incoming WebSocket messages.

Now it's your turn. First, implement the processClientMessage to log the player id with the body sent by the client. You can use the Logger instance. WebSocketFrame is an abstract class, be sure to handle all the possible cases for this scenario (text and close).

Secondly, implement the sendResponse with an infinite stream that will keep the connection open by continuously sending a message every 500 milliseconds.

Once finished, you can test your endpoint with the following URI, for example: ws://127.0.0.1:28082/player/7f9c3605-b0b8-4235-8c86-f15fbc11332d/join. To do that, you can use Postman or the command websocat:

websocat -t ws://127.0.0.1:28082/player/7f9c3605-b0b8-4235-8c86-f15fbc11332d/join
Solution

4-2-websocket-server-exercise-solution

diff

Compacted topics + Consumer

In Apache Kafka, a compacted topic refers to a particular type of topic configuration that retains only the latest value for each key in the topic. This is implemented with a log compaction process that ensures the log retains only the most recent record for each key, discarding older versions of the same key. This behavior differs from a regular Kafka topic, where all messages are typically retained.

Compacted topics are commonly used in scenarios where you want to maintain a current snapshot or summary of the data associated with each key, rather than storing every update. Compacted topics are commonly used in the following scenarios:

  • Event sourcing: They store the latest state of each entity or aggregate. Each event represents a change to the entity, and the compacted topic ensures that only the most recent state for each entity is stored.

  • Change data capture (CDC): CDC is a technique that captures and propagates data changes from one system to another. With compacted topics, change events are stored, ensuring only the latest changes for each record key are retained.

  • Database changelogs: As you can guess, compacted topics can efficiently store changelogs for databases.

Exercise 15:

⚠️ Before tackling this exercise: Check out this tag 4-3-player-compacted-group-id-kafka-exercise-setup 4-3-player-compacted-group-id-kafka-exercise-setup

Compacted topics are helpful in our following case, where we will store players ids waiting to start a new game. We only need the latest state of this entity.

Firstly, go to the file modules/server/src/main/scala/scaladays/config/KafkaSetup.scala. Here we implemented the createTopicUnless method, remember. Now it's time to implement createCompactedTopicUnless. In this exercise, you need to:

  • Implement the createCompactedTopicUnless method. You need to find a way of passing the cleanup.policy property to the new topic creation.
  • Call the newly implemented method in the bootTopics.

The second part of this exercise is a bit tricky. As part of the commit, we have added some code, including the Consumer implementation. This consumer will receive events from the topics and communicate the state to the client.

Consumers are identified and managed with two ids, client id, and group id.

  • Client Id: Identifies an individual Kafka client or consumer. Each consumer within a group should have a unique client id.
  • Group Id: Identifies a consumer group comprising one or multiple consumers. All consumers within a group should have the same group id.

One consequence is that multiple consumers sharing the same group id will receive different messages, as the group is considered a single logical entity. In our exercise, we process the events of a client by creating a consumer per WebSocket and connecting to a specific topic. Consequently, we need to specify the group id when creating the consumer and ensure every consumer receives all the messages from the topic.

Go ahead and update the consumerSettings method in modules/server/src/main/scala/scaladays/kafka/Consumer.scala to set the group id to the proper value.

Solution

4-4-player-compacted-topic-kafka-exercise-solution

diff

EventHandler

EventHandler is the critical piece of our connection between the client and the server. It needs to:

  1. Create a consumer per client
  2. Consume all game events
  3. Filter in those that are related to the current client

Exercise 16:

In this exercise, we're going to tie some pieces together. We want to connect our server stream to the client's WebSocket. We will use the TTTServer algebra to connect the different parts of our code.

Go to the modules/server/src/main/scala/scaladays/server/TTTServer.scala and create a method definition in the trait with the following signature:

PlayerId => fs2.Stream[F, Game]

This method will receive the PlayerId and keep updated the websocket connection with the game information.

Now, it's time to implement the method. Luckily, there's a method in EventHandler with that exactly signature. Implement the method just with a call to EventHandler. processEvent. We don't need to do anything special here.

The second step is to go to the EventHandler (modules/server/src/main/scala/scaladays/kafka/EventHandler.scala) and implement processEvent. Be sure your implementation fulfills the following:

  • Creates a consumer with a different group id per client.
  • Filter out Game events unrelated to the player in the scope.

The third step is to use the TTTServer method implemented in the first step in the WebServer (modules/server/src/main/scala/scaladays/server/WebServer.scala). Find the sendResponse method and change our dummy implementation with an actual implementation that:

  • Passes the input to the TTTServer method.
  • Maps the result into a WebSocketFrame of type Text. The body needs to be a JSON representation of the Game.
Solution

4-5-process-response-exercise-solution

diff

WaitingStream and Serdes

Now it's time to implement a kafka-streams logic. The WaitingStream will perform the following actions:

  • Read players from the input topic and put them in the player topic -> "Player waiting for a game"
  • Read players from the player topic and pair two in an event in the input topic -> "Found a potential match"
  • Read started games from the input topic and remove both players from the player topic -> "No longer waiting, now playing"

Exercise 17:

⚠️ Before tackling this exercise: Check out this tag 4-6-serdes-and-deleteplayers-stream-exercise-setup 4-6-serdes-and-deleteplayers-stream-exercise-setup

For this exercise, we must implement the latest logic in the WaitingStream (read started games from the input topic and remove both players from the player topic) with their corresponding Serdes.

First, let's review the current code state after the new commit.

  1. In the WebServer (modules/server/src/main/scala/scaladays/server/WebServer.scala), we redirect every action from the client to our algebra TTTServer.
  2. In the TTTServer (modules/server/src/main/scala/scaladays/server/TTTServer.scala), we have added one case, JoinGame, that makes a call to EventStorage.waitForMatch. This creates a WaitingForMatch event in the input topic underhood. You can check the code in the EventStorage (modules/server/src/main/scala/scaladays/kafka/EventStorage.scala).
  3. Added PlayerMatchTransformer (modules/server/src/main/scala/scaladays/kafka/stream/PlayerMatchTransformer.scala). This is a Kafka transformer whose main task is to keep a key-value storage (playerMatch) updated with the latest player id.
  4. Added WaitingPlayerStream (modules/server/src/main/scala/scaladays/kafka/stream/WaitingPlayerStream.scala) with the implementation of two of the three logic: A) Read players from the input topic and put them in the player topic (waitingPlayerToPlayerStream) B) Read players from the player topic and pair two in an event in the input topic (playerToMatchStream)

Particular emphasis on playerToMatchStream. This code has a code comment that explains some of the particularities of working with the kafka-streams API. You'll be facing these problems where the Scala and Java API are mixed, creating some strange behaviors when not handled carefully.

Now it's your turn. First, implement the deletePlayersAfterCreateMatch with the following logic: read started games from the input topic and remove both players from the player topic. Remember that TTTEvent contains an Event field indicating the type of event. Check the source code at modules/server/src/main/scala/scaladays/kafka/messages/Events.scala.

Once implemented, if we try to compile, we'll get errors like the following one:

[error]    |No given instance of type org.apache.kafka.streams.kstream.Produced[scaladays.models.ids.PlayerId,
[error]    |  scaladays.models.ids.PlayerId
[error]    |] was found for parameter produced of method to in class KStream.

This is because we now need more serdes instances. Concretely, the key serdes for PlayerId and String.

Open the VulcanSerdes (modules/server/src/main/scala/scaladays/kafka/codecs/VulcanSerdes.scala) and add those needed serdes.

Solution

4-7-serdes-and-deleteplayers-stream-exercise-solution

diff

Websocket connection in client

As we did with the Login process, now it is convenient to analyze how the client expects to communicate with the server and what the parties involved in the Game process are.

notitle

Exercise 18:

⚠️ Before tackling this exercise: Check out this tag 4-8-web-socket-client-server-exercise-setup 4-8-web-socket-client-server-exercise-setup

The server already has the WebSocket endpoint ready for use by the client app, so let's work to define:

  • The possible states necessary to open a WebSocket connection, create a new game, join an existing one, receive a WS error, etc.
  • What changes does the Model demand to represent these new states
  • What events (Msg) do we need to include in order to trigger those new states
  • How the Tyrian app will react to the new events (Msg)
  • What views will show the new state of the app
  • How can we interact with a Tyrian WebSocket via a set of Cmds

Let's continue with the same approach of dividing the exercise into smaller pieces.

Part 0: What are the new states?

First thing first. What are the new states of the app?

State What is happening?
Before joining a game The user has not clicked the "Join Game" button, and the WebSocket connection is not even started
Waiting a game and opening WS The user has already pressed the "Join Game" button, but the WS Connection client/serve not ready
Waiting a game but WS is open The user has already pressed the "Join Game" button, and the WS Connection client/serve is stablished
WebSocket error received The server has responded with an error. We don't have any game yet
Game received through the WS The server has responded with a registered game.

Part 1: The Model

If we recap, the current shape of the Model includes nickname, player, and errors, which don't seem to be enough to describe completely all the new states. We propose to add the following properties:

  • contest: Contest which will include an eventual game. We used Contest as synonymous of Game but avoiding two classes with the same name.
  • ws: Option[WebSocket] which will potentially contain WS connection once is established.

The model might evolve like:

State Nickname Player Errors Contest WS
Before joining a game Non empty Player.Registered Empty Contest.Empty None
Waiting a game and opening WS Non empty Player.Registered Empty Contest.InProgress None
Waiting a game but WS is open Non empty Player.Registered Empty Contest.InProgress Some(ws)
WebSocket error received Non empty Player.Registered Non empty Contest.Empty Some(ws)
Game received through the WS Non empty Player.Registered Empty Contest.Registered Some(ws)
  • Create a new file Contest.scala in modules/client/src/main/scala/scaladays/models with a Contest enum definition, including the following options:
    • Empty
    • InProgress(status: String)
    • Registered(game: Game)
  • Edit the file Model.scala to:
    • Describe the above changes in the case class Model
    • Update the init Model.

Part 2: The Msg

Reminder: The event LoginSuccess(playerId: PlayerId) would bring the Model to the state "Before joining a game", right?

Let's check what other events (Msg) we need to add to trigger all the new states:

State Triggered by the Msg:
Before joining a game LoginSuccess(playerId: PlayerId) (existing)
Waiting a game and opening WS WebSocketStatus(Connecting(playerId)) (new)
Waiting a game but WS is open WebSocketStatus(Connected(ws)) (new)
WebSocket error received WebSocketStatus(ConnectionError(error)) (new)
Game received through the WS GameUpdate(game) (new)
  • Edit the file Messages.scala to add these events.

Part 3: Update

Well, now we have new events to which we must react with model evolutions, as follows:

Msg: Model Cmd
WebSocketStatus(Connecting(playerId)) contest = Contest.InProgress client.connect(playerId)
WebSocketStatus(Connected(ws)) ws = ws Cmd.None
WebSocketStatus(ConnectionError(error)) errors = List(error) Cmd.None
GameUpdate(game) contest = Contest.Registered(game) Cmd.None
  • Edit the file Update.scala to react to these events as described above.
ℹ️ INFO: When the user pushes the Join Game button, the app establishes a WS connection with the server, so the Cmd client.connect(playerId) will be implemented in the part 5

Part 4: Views

As you might expect, we now need to define a view for each possible state the app can adopt. Here are some ideas:

Before joining a game

Screenshot 2023-05-28 at 5 04 36 AM

Waiting a game and opening WS | Waiting a game but WS is open | WebSocket error received

Screenshot 2023-05-28 at 4 00 57 AM

  • In the file WaitingGameView.scala implement the new views as shown above.
  • In the file Main.scala implement the def view(model: ModelIO): Html[Msg] to show the new views accordingly.

Part 5: WebSocket

We want actually to open a WS connection with the Server.

  • Edit the file ScalaDaysClient.scala to implement the function def connect(playerId: PlayerId): Cmd[F, Msg]

Tip 1: Tyrian WebSocket

At the official Tyrian docs, there is a very similar example where WebSocket.connect is used.

Tip 2: The URI of the WebSocket endpoint

As we implemented in the server side, it would be val uri = wsUri / "player" / playerId.show / "join"

Solution

4-9-web-socket-client-server-exercise-solution

diff

Magic monitoring: StartGame in the input topic

Now it's time to see what's happening in the background. If two players perform a login and then initiate a "join game" action, we should see how this is materialized in a "start game" event in the input topic.

Exercise 19:

Follow the next steps for creating two players waiting for a game:

  1. Open a client
  2. Fill in a name and click on "Login"
  3. Click on "Join a game"
  4. Repete the three steps above to create a different player

Now, let's access to Kafka Mafic. You should see your cluster, if not, refer to Exercise 13 to create it.

Expands the topics section

Topics

If you click on the input topic and search for messages, you should be able to verify that new messages are created in the shape we defined in the exercises above. Selecting "Search backward (descending offsets)" will show you the latest messages on top.

Input topic

You should see different messages like the following:

  • Event with a nickname
Message: Object {"time":17013900,"event":{"nickname":"Xebia Rocks"}}
  • Event with a nickname and player id
Message: Object {"time":17013914,"event":{"playerId":"f658e2c4-ae21-433c-bd7d-6123d5b956a2","nickname":"Xebia Rocks"}}
  • Event for waiting for a Game with two player ids
Message: Object {"time":1685367116509,"event":{"gameId":"4a45eae9-33b8-42a0-823b-5d519d581161","crossPlayer":"02cab447-1abc-4745-a32c-3964e1b274cc","circlePlayer":"f658e2c4-ae21-433c-bd7d-6123d5b956a2"}}

The player topic should also contain the different ids.

Player topic

GameStream: matchStream

With the Game event adequately created, it's time to implement the server logic for initiating a new game. This will allow the UI to enter a new state, where players will alternate turns until the end of the game is reached successfully or, hopefully not, due to an error in our system.

Exercise 20:

⚠️ Before tackling this exercise: Check out this tag 4-10-startGame-stream-exercise-setup 4-10-startGame-stream-exercise-setup

First of all, let's review the introduced changes by this commit.

In KafkaSetup (modules/server/src/main/scala/scaladays/config/KafkaSetup.scala), we call createCompactedTopicUnless with the game topic. Since games are modeled as a state machine, we are interested only in the last state of the game.

In KTopology (modules/server/src/main/scala/scaladays/kafka/KTopology.scala), we are now applying the kafka streams definition we're going to implement as part of this exercise.

Finally, in GameStream (modules/server/src/main/scala/scaladays/kafka/stream/GameStream.scala), we add the signature for the new method.

Open GameStream in your editor and complete the matchStream method. You need to:

  • Read StartGame events from the input topic.
  • Generate events Game with the proper values in the game topic.

As usual, once implemented, we need to add the serdes for the transformations. Run compile in an SBT console to see which serdes you are missing and add them in the VulcanSerdes (modules/server/src/main/scala/scaladays/kafka/codecs/VulcanSerdes.scala).

If you retry the Kafka Magic exercise, you should start seeing some Game events in the game topic.

Solution

4-11-startGame-stream-exercise-solution

diff

Receive Game in client

In the client, we have provided the app with the ability to create WS connections, and, consequently, to create new games. However, given the two-way nature of these types of connections, we are not yet processing the events of new games created on the server. And that is what we are going to address in this section.

At this point, we must introduce a nice ability that Tyrian implements to listen to events of different kinds. And Tyrian's documentation defines it as:

Subscriptions (Subs) are used to observe something that changes over time, and to emit discrete messages when something happens.

The Tyrian.WebSocket lets us generate new subscriptions (Sub[F, Msg]) by passing a transformation from WebSocketEvent to Msg. Something like def subscribe[Msg](f: WebSocketEvent => Msg): Sub[F, Msg], which is extremely convenient because we have a potential WebSocket in our Model.

Exercise 21:

⚠️ Before tackling this exercise: Check out this tag 4-12-handle-game-in-client-exercise-setup 4-12-handle-game-in-client-exercise-setup

In the Main.scala we already are checking if our Model contains a WebSocket in order to observe its events:

def subscriptions(model: ModelIO): Sub[IO, Msg] =
  model.ws.fold(Sub.emit(Msg.WebSocketStatus(WebSocketMessage.WebSocketStatus.Disconnected)))(ws =>
    scalaDaysClient.handleWebSocket(ws)
  )

Part 1: Handle the WebSocket

But now it's time to implement scalaDaysClient.handleWebSocket(ws) (from ScalaDaysClient.scala`) taking these tips into account:

  • Remember the signature def subscribe[Msg](f: WebSocketEvent => Msg): Sub[F, Msg] from WebSocket
  • We are going to react to WebSocketEvent.Receive(message) by decoding the message as Game.
    • If works, then we emit Msg.GameUpdate(game)
    • If not. We emit WebSocketStatus.ConnectionError as a Msg.
  • If we receive WebSocketEvent.Heartbeat or WebSocketEvent.Open we emit WebSocketStatus.Nop as a Msg.
  • Finally, we emit WebSocketStatus.ConnectionError(WebSocketError("Unknown websocket message")) for the rest of incoming events.

Part 2: React to Msg.GameUpdate(game)

In the Update.scala we want to react to the new event Msg.GameUpdate(game). Please make sure the Model evolves with contest = Contest.Registered(game) when we receive a Msg.GameUpdate(game).

Part 3: If we receive Game, now we have to show it

The views for the Game are something like these:

The user's turn The opponent's turn
notitle Screenshot 2023-05-28 at 4 01 20 AM

The Hmlt[Msg] for these views are not trivial, so we have partially implemented them at GameView.scala. But there is still one missing part. The method def CellView(position: Position): Html[Msg] needs implementation, and renders a button for the given position.

Tip 1: Enabled or Disabled

The button should be disabled if it not your turn, or if there is a movement at this position.

Tip 2: OnClick when the button is enabled

When the button is enabled, the onClick action should send a Msg.RequestNewMovement(game, newMovement) (which is a new Msg what we'll use in the next section.

Solution

4-13-handle-game-in-client-exercise-solution

diff