Skip to content

Commit

Permalink
a game can run on multiple engines #107
Browse files Browse the repository at this point in the history
  • Loading branch information
isuru89 committed Aug 4, 2022
1 parent 264a57f commit b091db4
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 78 deletions.
4 changes: 1 addition & 3 deletions buildscripts/engine.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ oasis {
maxConsumerThreadPoolSize: 4

gameEventsConsumer = {
# groupId = ""

# If not specified, this will take engine id
# If not specified, this will be engine id
# instanceId = ""

# These props will directly feed to Kafka property configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,29 +162,21 @@ private void gameSpecificCommand(GameCommand gameCommand) {
int gameId = gameCommand.getGameId();
ExternalPartyImpl eventSource = ExternalParty.EXTERNAL_PARTY.get(getContext().getSystem());
if (status == GameCommand.GameLifecycle.CREATE || status == GameCommand.GameLifecycle.START) {
if (acquireGameLock(gameId, engineContext.id())) {
createGameRuleRefNx(gameId);

if (gamesRunning.add(gameId)) {
contextMap.put(gameId, loadGameContext(gameId));
publishGameState(gameId, GameState.STARTED);
} else {
mainLog.info("Game {} is already running in this engine. So skipping re-registration.", gameId);
}
eventSource.ackGameStateChanged(gameCommand);
mainLog.info("Successfully acquired the game {} to be run on this engine having id {}. Game Event: {}",
gameId, engineContext.id(), status);
createGameRuleRefNx(gameId);

if (gamesRunning.add(gameId)) {
contextMap.put(gameId, loadGameContext(gameId));
publishGameState(gameId, GameState.STARTED);
} else {
mainLog.warning("Cannot acquire game {} for this engine, because it is already owned by another engine!", gameId);
eventSource.nackGameStateChanged(gameCommand);
mainLog.info("Game {} is already running in this engine. So skipping re-registration.", gameId);
}
eventSource.ackGameStateChanged(gameCommand);
mainLog.info("Game engine '{}' is ready to run the game id: {}. Ack game state: {}",
engineContext.id(), gameId, status);
} else if (status == GameCommand.GameLifecycle.REMOVE) {
if (releaseGameLock(gameId, engineContext.id())) {
gamesRunning.remove(gameId);
contextMap.remove(gameId);
} else {
mainLog.info("The game {} is not running in this engine. Skipping remove message.", gameId);
}
gamesRunning.remove(gameId);
contextMap.remove(gameId);

publishGameState(gameId, GameState.STOPPED);
eventSource.ackGameStateChanged(gameCommand);
} else if (status == GameCommand.GameLifecycle.UPDATE) {
Expand All @@ -200,40 +192,6 @@ private void gameSpecificCommand(GameCommand gameCommand) {
}
}

private boolean releaseGameLock(int gameId, String myId) {
String gameIdStr = String.valueOf(gameId);
try (DbContext context = engineContext.getDb().createContext()) {
String owningEngine = context.getValueFromMap(ID.GAME_ENGINES, gameIdStr);
if (myId.equals(owningEngine)) {
mainLog.info("Removing game lock... (gameId: {})", gameId);
context.removeKeyFromMap(ID.GAME_ENGINES, gameIdStr);
publishGameState(gameId, GameState.STOPPED, context);
mainLog.info("Removed game lock! (gameId: {})", gameId);
return true;
}
} catch (IOException e) {
mainLog.error("Cannot acquire game lock! Unexpected error!", e);
}
return false;
}

private boolean acquireGameLock(int gameId, String myId) {
String gameIdStr = String.valueOf(gameId);
try (DbContext context = engineContext.getDb().createContext()) {
boolean locked = context.setIfNotExistsInMap(ID.GAME_ENGINES, gameIdStr, myId);
if (!locked) {
String currentEngineRunning = context.getValueFromMap(ID.GAME_ENGINES, gameIdStr);
mainLog.info("Game {} is currently run by the engine having id {}. My engine id = {}",
gameId, currentEngineRunning, myId);
return myId.equals(currentEngineRunning);
}
return true;
} catch (IOException e) {
mainLog.error("Cannot acquire game lock! Unexpected error!", e);
}
return false;
}

private GameContext loadGameContext(int gameId) {
return new GameContext(gameId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testMultipleEngines() throws OasisException, IOException, ExecutionE

boolean gameRunningE1 = engine1.isGameRunning(1);
boolean gameRunningE2 = engine2.isGameRunning(1);
Assertions.assertTrue(gameRunningE1 ^ gameRunningE2);
Assertions.assertTrue(gameRunningE1 && gameRunningE2);

engine1.stopGame(1);
engine2.stopGame(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
*/
final class KafkaConstants {

static final String GAME_EVENT_CONSUMER_GROUP = "oasis-game-event-consumer-%d";

static final String TOPIC_GAME_EVENTS = "oasis.game.events";
static final String TOPIC_GAME_ANNOUNCEMENTS = "oasis.game.announcements";
static final String TOPIC_ENGINE_RELATED_EVENTS = "oasis.engine.events";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void init(RuntimeContextSupport context, MessageReceiver sinkRef) throws
consumerPool = Executors.newCachedThreadPool();

LOG.debug("Initializing kafka broadcast topic consumer...");
Properties broadcastingProps = KafkaUtils.getBroadcastConsumerProps(kafkaConfigs, engineId);
Properties broadcastingProps = KafkaUtils.getBroadcastConsumerProps(kafkaConfigs);
String instanceId = broadcastingProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
String uniqueId = engineId + "::" + instanceId;
gameBroadcastConsumer = new KafkaBroadcastConsumerRunner(sinkRef, KafkaConstants.TOPIC_GAME_ANNOUNCEMENTS, uniqueId);
Expand Down Expand Up @@ -116,7 +116,7 @@ public void handleGameCommand(GameCommand gameCommand) {
gameEventsConsumers.put(gameId, gameReader);
GameEventHandler handler = new GameEventHandler(sinkRef);

Properties thisConsumerProps = KafkaUtils.createGameEventConsumerProps(providedKafkaConfigs, engineId);
Properties thisConsumerProps = KafkaUtils.createGameEventConsumerProps(providedKafkaConfigs, gameId, engineId);
LOG.info("Subscribing to game {} event topic with configs {}...", gameId, thisConsumerProps);
gameReader.init(thisConsumerProps, handler);
consumerPool.submit(gameReader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ static Properties getFeedConsumerProps(KafkaConfigs kafkaConfigs) {
return props;
}

static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs, String engineId) {
static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Expand Down Expand Up @@ -148,23 +148,18 @@ static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs, String en
return props;
}

static Properties createGameEventConsumerProps(KafkaConfigs kafkaConfigs, String engineId) {
static Properties createGameEventConsumerProps(KafkaConfigs kafkaConfigs, int gameId, String engineId) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigs.getBrokerUrls());
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// if user has specified custom kafka configs for game event topics...
String consumerGroupId = null;
String consumerGroupId = String.format(KafkaConstants.GAME_EVENT_CONSUMER_GROUP, gameId);
if (kafkaConfigs.getGameEventsConsumer() != null) {
if (Utils.isNotEmpty(kafkaConfigs.getGameEventsConsumer().getProps())) {
props.putAll(kafkaConfigs.getGameEventsConsumer().getProps());
}
consumerGroupId = kafkaConfigs.getGameEventsConsumer().getGroupId();
}

if (Texts.isEmpty(consumerGroupId)) {
consumerGroupId = UUID.randomUUID().toString();
}
String consumerGroupInstanceId = Texts.isEmpty(engineId) ? UUID.randomUUID().toString() : engineId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void getFeedConsumerProps_withoutGroupIdAndProps() {
@Test
void getBroadcastConsumerProps_withGroupId() {
var configs = readConfigs("broadcastConsumer-withGroupId");
var props = KafkaUtils.getBroadcastConsumerProps(configs, "engine-id");
var props = KafkaUtils.getBroadcastConsumerProps(configs);

assertProps(props,
Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092",
Expand All @@ -166,7 +166,7 @@ void getBroadcastConsumerProps_withGroupId() {
@Test
void getBroadcastConsumerProps_withoutGroupIdAndProps() {
var configs = readConfigs("broadcastConsumer-withoutGroupIdAndProps");
var props = KafkaUtils.getBroadcastConsumerProps(configs, "engine-id");
var props = KafkaUtils.getBroadcastConsumerProps(configs);

Assertions.assertEquals(7, props.size());
Assertions.assertNotNull(props.getProperty(GROUP_ID_CONFIG));
Expand All @@ -182,31 +182,31 @@ void getBroadcastConsumerProps_withoutGroupIdAndProps() {
}

@Test
void createGameEventConsumerProps_withGroupId() {
void createGameEventConsumerProps_withEngineId() {
var configs = readConfigs("gameEventsConsumer-withGroupId");
var props = KafkaUtils.createGameEventConsumerProps(configs, "engine-id");
var props = KafkaUtils.createGameEventConsumerProps(configs, 1,"engine-id");

assertProps(props,
Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092",
KEY_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
VALUE_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
GROUP_ID_CONFIG, "test-consumer-group-01",
GROUP_ID_CONFIG, "oasis-game-event-consumer-1",
GROUP_INSTANCE_ID_CONFIG, "engine-id",
AUTO_OFFSET_RESET_CONFIG, KafkaConstants.EARLIEST));
}

@Test
void createGameEventConsumerProps_withoutGroupIdAndProps() {
void createGameEventConsumerProps_withoutEngineIdAndProps() {
var configs = readConfigs("gameEventsConsumer-withoutGroupIdAndProps");
var props = KafkaUtils.createGameEventConsumerProps(configs, "engine-id");
var props = KafkaUtils.createGameEventConsumerProps(configs, 2,null);

Assertions.assertEquals(8, props.size());
Assertions.assertNotNull(props.getProperty(GROUP_ID_CONFIG));
Assertions.assertEquals(props.getProperty(GROUP_ID_CONFIG).length(), UUID.randomUUID().toString().length());
Assertions.assertNotNull(props.getProperty(GROUP_INSTANCE_ID_CONFIG));
Assertions.assertEquals(props.getProperty(GROUP_INSTANCE_ID_CONFIG).length(), UUID.randomUUID().toString().length());
Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092",
KEY_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
VALUE_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER,
GROUP_INSTANCE_ID_CONFIG, "engine-id",
GROUP_ID_CONFIG, "oasis-game-event-consumer-2",
AUTO_OFFSET_RESET_CONFIG, KafkaConstants.EARLIEST,
MAX_POLL_RECORDS_CONFIG, "500",
ENABLE_AUTO_COMMIT_CONFIG, "true").forEach((k, v) -> {
Expand Down

0 comments on commit b091db4

Please sign in to comment.