diff --git a/buildscripts/engine.conf b/buildscripts/engine.conf index 23f8fdd9..a6fbd9f2 100644 --- a/buildscripts/engine.conf +++ b/buildscripts/engine.conf @@ -17,9 +17,7 @@ oasis { maxConsumerThreadPoolSize: 4 gameEventsConsumer = { - # groupId = "" - - # If not specified, this will take engine id + # If not specified, this will be engine id # instanceId = "" # These props will directly feed to Kafka property configs diff --git a/engine/src/main/java/io/github/oasis/engine/actors/OasisSupervisor.java b/engine/src/main/java/io/github/oasis/engine/actors/OasisSupervisor.java index 4579b8dd..23602756 100644 --- a/engine/src/main/java/io/github/oasis/engine/actors/OasisSupervisor.java +++ b/engine/src/main/java/io/github/oasis/engine/actors/OasisSupervisor.java @@ -162,29 +162,21 @@ private void gameSpecificCommand(GameCommand gameCommand) { int gameId = gameCommand.getGameId(); ExternalPartyImpl eventSource = ExternalParty.EXTERNAL_PARTY.get(getContext().getSystem()); if (status == GameCommand.GameLifecycle.CREATE || status == GameCommand.GameLifecycle.START) { - if (acquireGameLock(gameId, engineContext.id())) { - createGameRuleRefNx(gameId); - - if (gamesRunning.add(gameId)) { - contextMap.put(gameId, loadGameContext(gameId)); - publishGameState(gameId, GameState.STARTED); - } else { - mainLog.info("Game {} is already running in this engine. So skipping re-registration.", gameId); - } - eventSource.ackGameStateChanged(gameCommand); - mainLog.info("Successfully acquired the game {} to be run on this engine having id {}. Game Event: {}", - gameId, engineContext.id(), status); + createGameRuleRefNx(gameId); + + if (gamesRunning.add(gameId)) { + contextMap.put(gameId, loadGameContext(gameId)); + publishGameState(gameId, GameState.STARTED); } else { - mainLog.warning("Cannot acquire game {} for this engine, because it is already owned by another engine!", gameId); - eventSource.nackGameStateChanged(gameCommand); + mainLog.info("Game {} is already running in this engine. So skipping re-registration.", gameId); } + eventSource.ackGameStateChanged(gameCommand); + mainLog.info("Game engine '{}' is ready to run the game id: {}. Ack game state: {}", + engineContext.id(), gameId, status); } else if (status == GameCommand.GameLifecycle.REMOVE) { - if (releaseGameLock(gameId, engineContext.id())) { - gamesRunning.remove(gameId); - contextMap.remove(gameId); - } else { - mainLog.info("The game {} is not running in this engine. Skipping remove message.", gameId); - } + gamesRunning.remove(gameId); + contextMap.remove(gameId); + publishGameState(gameId, GameState.STOPPED); eventSource.ackGameStateChanged(gameCommand); } else if (status == GameCommand.GameLifecycle.UPDATE) { @@ -200,40 +192,6 @@ private void gameSpecificCommand(GameCommand gameCommand) { } } - private boolean releaseGameLock(int gameId, String myId) { - String gameIdStr = String.valueOf(gameId); - try (DbContext context = engineContext.getDb().createContext()) { - String owningEngine = context.getValueFromMap(ID.GAME_ENGINES, gameIdStr); - if (myId.equals(owningEngine)) { - mainLog.info("Removing game lock... (gameId: {})", gameId); - context.removeKeyFromMap(ID.GAME_ENGINES, gameIdStr); - publishGameState(gameId, GameState.STOPPED, context); - mainLog.info("Removed game lock! (gameId: {})", gameId); - return true; - } - } catch (IOException e) { - mainLog.error("Cannot acquire game lock! Unexpected error!", e); - } - return false; - } - - private boolean acquireGameLock(int gameId, String myId) { - String gameIdStr = String.valueOf(gameId); - try (DbContext context = engineContext.getDb().createContext()) { - boolean locked = context.setIfNotExistsInMap(ID.GAME_ENGINES, gameIdStr, myId); - if (!locked) { - String currentEngineRunning = context.getValueFromMap(ID.GAME_ENGINES, gameIdStr); - mainLog.info("Game {} is currently run by the engine having id {}. My engine id = {}", - gameId, currentEngineRunning, myId); - return myId.equals(currentEngineRunning); - } - return true; - } catch (IOException e) { - mainLog.error("Cannot acquire game lock! Unexpected error!", e); - } - return false; - } - private GameContext loadGameContext(int gameId) { return new GameContext(gameId); } diff --git a/engine/src/test/java/io/github/oasis/engine/MultiEngineTest.java b/engine/src/test/java/io/github/oasis/engine/MultiEngineTest.java index 67c60aaa..5012fdfb 100644 --- a/engine/src/test/java/io/github/oasis/engine/MultiEngineTest.java +++ b/engine/src/test/java/io/github/oasis/engine/MultiEngineTest.java @@ -89,7 +89,7 @@ public void testMultipleEngines() throws OasisException, IOException, ExecutionE boolean gameRunningE1 = engine1.isGameRunning(1); boolean gameRunningE2 = engine2.isGameRunning(1); - Assertions.assertTrue(gameRunningE1 ^ gameRunningE2); + Assertions.assertTrue(gameRunningE1 && gameRunningE2); engine1.stopGame(1); engine2.stopGame(1); diff --git a/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaConstants.java b/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaConstants.java index af7361df..0befbece 100644 --- a/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaConstants.java +++ b/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaConstants.java @@ -27,6 +27,8 @@ */ final class KafkaConstants { + static final String GAME_EVENT_CONSUMER_GROUP = "oasis-game-event-consumer-%d"; + static final String TOPIC_GAME_EVENTS = "oasis.game.events"; static final String TOPIC_GAME_ANNOUNCEMENTS = "oasis.game.announcements"; static final String TOPIC_ENGINE_RELATED_EVENTS = "oasis.engine.events"; diff --git a/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaStreamConsumer.java b/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaStreamConsumer.java index 4c1b08a5..abade9d7 100644 --- a/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaStreamConsumer.java +++ b/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaStreamConsumer.java @@ -86,7 +86,7 @@ public void init(RuntimeContextSupport context, MessageReceiver sinkRef) throws consumerPool = Executors.newCachedThreadPool(); LOG.debug("Initializing kafka broadcast topic consumer..."); - Properties broadcastingProps = KafkaUtils.getBroadcastConsumerProps(kafkaConfigs, engineId); + Properties broadcastingProps = KafkaUtils.getBroadcastConsumerProps(kafkaConfigs); String instanceId = broadcastingProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG); String uniqueId = engineId + "::" + instanceId; gameBroadcastConsumer = new KafkaBroadcastConsumerRunner(sinkRef, KafkaConstants.TOPIC_GAME_ANNOUNCEMENTS, uniqueId); @@ -116,7 +116,7 @@ public void handleGameCommand(GameCommand gameCommand) { gameEventsConsumers.put(gameId, gameReader); GameEventHandler handler = new GameEventHandler(sinkRef); - Properties thisConsumerProps = KafkaUtils.createGameEventConsumerProps(providedKafkaConfigs, engineId); + Properties thisConsumerProps = KafkaUtils.createGameEventConsumerProps(providedKafkaConfigs, gameId, engineId); LOG.info("Subscribing to game {} event topic with configs {}...", gameId, thisConsumerProps); gameReader.init(thisConsumerProps, handler); consumerPool.submit(gameReader); diff --git a/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaUtils.java b/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaUtils.java index d94ccff3..e120d27e 100644 --- a/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaUtils.java +++ b/externals/kafka-stream/src/main/java/io/github/oasis/ext/kafkastream/KafkaUtils.java @@ -118,7 +118,7 @@ static Properties getFeedConsumerProps(KafkaConfigs kafkaConfigs) { return props; } - static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs, String engineId) { + static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs) { Properties props = new Properties(); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -148,23 +148,18 @@ static Properties getBroadcastConsumerProps(KafkaConfigs kafkaConfigs, String en return props; } - static Properties createGameEventConsumerProps(KafkaConfigs kafkaConfigs, String engineId) { + static Properties createGameEventConsumerProps(KafkaConfigs kafkaConfigs, int gameId, String engineId) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigs.getBrokerUrls()); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // if user has specified custom kafka configs for game event topics... - String consumerGroupId = null; + String consumerGroupId = String.format(KafkaConstants.GAME_EVENT_CONSUMER_GROUP, gameId); if (kafkaConfigs.getGameEventsConsumer() != null) { if (Utils.isNotEmpty(kafkaConfigs.getGameEventsConsumer().getProps())) { props.putAll(kafkaConfigs.getGameEventsConsumer().getProps()); } - consumerGroupId = kafkaConfigs.getGameEventsConsumer().getGroupId(); - } - - if (Texts.isEmpty(consumerGroupId)) { - consumerGroupId = UUID.randomUUID().toString(); } String consumerGroupInstanceId = Texts.isEmpty(engineId) ? UUID.randomUUID().toString() : engineId; diff --git a/externals/kafka-stream/src/test/java/io/github/oasis/ext/kafkastream/KafkaUtilsTest.java b/externals/kafka-stream/src/test/java/io/github/oasis/ext/kafkastream/KafkaUtilsTest.java index 06bc1029..4180a5b3 100644 --- a/externals/kafka-stream/src/test/java/io/github/oasis/ext/kafkastream/KafkaUtilsTest.java +++ b/externals/kafka-stream/src/test/java/io/github/oasis/ext/kafkastream/KafkaUtilsTest.java @@ -151,7 +151,7 @@ void getFeedConsumerProps_withoutGroupIdAndProps() { @Test void getBroadcastConsumerProps_withGroupId() { var configs = readConfigs("broadcastConsumer-withGroupId"); - var props = KafkaUtils.getBroadcastConsumerProps(configs, "engine-id"); + var props = KafkaUtils.getBroadcastConsumerProps(configs); assertProps(props, Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092", @@ -166,7 +166,7 @@ void getBroadcastConsumerProps_withGroupId() { @Test void getBroadcastConsumerProps_withoutGroupIdAndProps() { var configs = readConfigs("broadcastConsumer-withoutGroupIdAndProps"); - var props = KafkaUtils.getBroadcastConsumerProps(configs, "engine-id"); + var props = KafkaUtils.getBroadcastConsumerProps(configs); Assertions.assertEquals(7, props.size()); Assertions.assertNotNull(props.getProperty(GROUP_ID_CONFIG)); @@ -182,31 +182,31 @@ void getBroadcastConsumerProps_withoutGroupIdAndProps() { } @Test - void createGameEventConsumerProps_withGroupId() { + void createGameEventConsumerProps_withEngineId() { var configs = readConfigs("gameEventsConsumer-withGroupId"); - var props = KafkaUtils.createGameEventConsumerProps(configs, "engine-id"); + var props = KafkaUtils.createGameEventConsumerProps(configs, 1,"engine-id"); assertProps(props, Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092", KEY_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER, VALUE_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER, - GROUP_ID_CONFIG, "test-consumer-group-01", + GROUP_ID_CONFIG, "oasis-game-event-consumer-1", GROUP_INSTANCE_ID_CONFIG, "engine-id", AUTO_OFFSET_RESET_CONFIG, KafkaConstants.EARLIEST)); } @Test - void createGameEventConsumerProps_withoutGroupIdAndProps() { + void createGameEventConsumerProps_withoutEngineIdAndProps() { var configs = readConfigs("gameEventsConsumer-withoutGroupIdAndProps"); - var props = KafkaUtils.createGameEventConsumerProps(configs, "engine-id"); + var props = KafkaUtils.createGameEventConsumerProps(configs, 2,null); Assertions.assertEquals(8, props.size()); - Assertions.assertNotNull(props.getProperty(GROUP_ID_CONFIG)); - Assertions.assertEquals(props.getProperty(GROUP_ID_CONFIG).length(), UUID.randomUUID().toString().length()); + Assertions.assertNotNull(props.getProperty(GROUP_INSTANCE_ID_CONFIG)); + Assertions.assertEquals(props.getProperty(GROUP_INSTANCE_ID_CONFIG).length(), UUID.randomUUID().toString().length()); Map.of(BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092", KEY_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER, VALUE_DESERIALIZER_CLASS_CONFIG, DEF_STR_DESERIALIZER, - GROUP_INSTANCE_ID_CONFIG, "engine-id", + GROUP_ID_CONFIG, "oasis-game-event-consumer-2", AUTO_OFFSET_RESET_CONFIG, KafkaConstants.EARLIEST, MAX_POLL_RECORDS_CONFIG, "500", ENABLE_AUTO_COMMIT_CONFIG, "true").forEach((k, v) -> {