diff --git a/diambra/arena/arena_gym.py b/diambra/arena/arena_gym.py index ef8b9fa7..c840198f 100644 --- a/diambra/arena/arena_gym.py +++ b/diambra/arena/arena_gym.py @@ -9,9 +9,8 @@ from diambra.arena.env_settings import EnvironmentSettings1P, EnvironmentSettings2P from typing import Union, Any, Dict, List -# DIAMBRA Env Gym class DiambraGymBase(gym.Env): - """Diambra Environment gym interface""" + """Diambra Environment gymnasium base interface""" metadata = {"render_modes": ["human", "rgb_array"]} _frame = None _last_action = None @@ -60,76 +59,6 @@ def __init__(self, env_settings: Union[EnvironmentSettings1P, EnvironmentSetting self.max_delta_health = self.env_info.ram_states[k].max - self.env_info.ram_states[k].min break - def _get_ram_states_obs_dict(self): - player_spec_dict = {} - generic_dict = {} - # Adding env additional observations (side-specific) - for k, v in self.env_info.ram_states.items(): - if k[-2:] == "P1": - target_dict = player_spec_dict - knew = "own_" + k[:-2] - elif k[-2:] == "P2": - target_dict = player_spec_dict - knew = "opp_" + k[:-2] - else: - target_dict = generic_dict - knew = k - - # Discrete spaces (binary / categorical) - if v.type == 0 or v.type == 2: - target_dict[knew] = gym.spaces.Discrete(v.max + 1) - elif v.type == 1: # Box spaces - target_dict[knew] = gym.spaces.Box(low=v.min, high=v.max, shape=(1,), dtype=np.int32) - else: - raise RuntimeError("Only Discrete (Binary/Categorical) | Box Spaces allowed") - - player_spec_dict["action_move"] = gym.spaces.Discrete(self.n_actions[0]) - player_spec_dict["action_attack"] = gym.spaces.Discrete(self.n_actions[1]) - - return generic_dict, player_spec_dict - - # Get frame - def _get_frame(self, response): - self._frame = np.frombuffer(response.observation.frame, dtype='uint8').reshape(self.env_info.frame_shape.h, \ - self.env_info.frame_shape.w, \ - self.env_info.frame_shape.c) - return self._frame - - # Get info - def _get_info(self, response): - info = dict(response.info.game_states) - info["settings"] = self.env_settings.pb_model - return info - - # Integrate player specific RAM states into observation - def _player_specific_ram_states_integration(self, response, idx): - player_spec_dict = {} - generic_dict = {} - - # Adding env additional observations (side-specific) - player_role = self.env_settings.pb_model.variable_env_settings.player_env_settings[idx].role - for k, v in self.env_info.ram_states.items(): - if ("P1" in k or "P2" in k): - target_dict = player_spec_dict - if k[-2:] == player_role: - knew = "own_" + k[:-2] - else: - knew = "opp_" + k[:-2] - else: - target_dict = generic_dict - knew = k - - # Box spaces - if v.type == 1: - target_dict[knew] = np.array([response.observation.ram_states[k]], dtype=np.int32) - else: # Discrete spaces (binary / categorical) - target_dict[knew] = response.observation.ram_states[k] - - player_spec_dict["action_move"] = self._last_action[idx][0] - player_spec_dict["action_attack"] = self._last_action[idx][1] - - return generic_dict, player_spec_dict - # Return env action list def get_actions_tuples(self): return self.actions_tuples @@ -217,8 +146,78 @@ def close(self): cv2.destroyAllWindows() self.arena_engine.close() -# DIAMBRA Gym 1P class + def _get_ram_states_obs_dict(self): + player_spec_dict = {} + generic_dict = {} + # Adding env additional observations (side-specific) + for k, v in self.env_info.ram_states.items(): + if k.endswith("P1"): + target_dict = player_spec_dict + knew = "own_" + k[:-2] + elif k.endswith("P2"): + target_dict = player_spec_dict + knew = "opp_" + k[:-2] + else: + target_dict = generic_dict + knew = k + + # Discrete spaces (binary / categorical) + if v.type == 0 or v.type == 2: + target_dict[knew] = gym.spaces.Discrete(v.max + 1) + elif v.type == 1: # Box spaces + target_dict[knew] = gym.spaces.Box(low=v.min, high=v.max, shape=(1,), dtype=np.int32) + else: + raise RuntimeError("Only Discrete (Binary/Categorical) | Box Spaces allowed") + + player_spec_dict["action_move"] = gym.spaces.Discrete(self.n_actions[0]) + player_spec_dict["action_attack"] = gym.spaces.Discrete(self.n_actions[1]) + + return generic_dict, player_spec_dict + + # Get frame + def _get_frame(self, response): + self._frame = np.frombuffer(response.observation.frame, dtype='uint8').reshape(self.env_info.frame_shape.h, \ + self.env_info.frame_shape.w, \ + self.env_info.frame_shape.c) + return self._frame + + # Get info + def _get_info(self, response): + info = dict(response.info.game_states) + info["settings"] = self.env_settings.pb_model + return info + + # Integrate player specific RAM states into observation + def _player_specific_ram_states_integration(self, response, idx): + player_spec_dict = {} + generic_dict = {} + + # Adding env additional observations (side-specific) + player_role = self.env_settings.pb_model.variable_env_settings.player_env_settings[idx].role + for k, v in self.env_info.ram_states.items(): + if (k.endswith("P1") or k.endswith("P2")): + target_dict = player_spec_dict + if k[-2:] == player_role: + knew = "own_" + k[:-2] + else: + knew = "opp_" + k[:-2] + else: + target_dict = generic_dict + knew = k + + # Box spaces + if v.type == 1: + target_dict[knew] = np.array([response.observation.ram_states[k]], dtype=np.int32) + else: # Discrete spaces (binary / categorical) + target_dict[knew] = response.observation.ram_states[k] + + player_spec_dict["action_move"] = self._last_action[idx][0] + player_spec_dict["action_attack"] = self._last_action[idx][1] + + return generic_dict, player_spec_dict + class DiambraGym1P(DiambraGymBase): + """Diambra Environment gymnasium single agent interface""" def __init__(self, env_settings): super().__init__(env_settings) @@ -248,15 +247,6 @@ def __init__(self, env_settings): self.action_space = gym.spaces.Discrete(self.n_actions[0] + self.n_actions[1] - 1) self.logger.debug("Using Discrete action space") - def _get_obs(self, response): - observation = {} - observation["frame"] = self._get_frame(response) - generic_obs_dict, player_obs_dict = self._player_specific_ram_states_integration(response, 0) - observation.update(generic_obs_dict) - observation.update(player_obs_dict) - - return observation - # Return the no-op action def get_no_op_action(self): if isinstance(self.action_space, gym.spaces.MultiDiscrete): @@ -276,8 +266,17 @@ def step(self, action: Union[int, List[int]]): return observation, response.reward, response.info.game_states["episode_done"], False, self._get_info(response) -# DIAMBRA Gym 2P Class + def _get_obs(self, response): + observation = {} + observation["frame"] = self._get_frame(response) + generic_obs_dict, player_obs_dict = self._player_specific_ram_states_integration(response, 0) + observation.update(generic_obs_dict) + observation.update(player_obs_dict) + + return observation + class DiambraGym2P(DiambraGymBase): + """Diambra Environment gymnasium multi-agent interface""" def __init__(self, env_settings): super().__init__(env_settings) @@ -298,36 +297,17 @@ def __init__(self, env_settings): # Action space # Dictionary - action_space_dict = {} - for idx in range(2): - if env_settings.action_space[idx] == "multi_discrete": - action_space_dict["agent_{}".format(idx)] = gym.spaces.MultiDiscrete(self.n_actions) - elif env_settings.action_space[idx] == "discrete": - action_space_dict["agent_{}".format(idx)] = gym.spaces.Discrete(self.n_actions[0] + self.n_actions[1] - 1) - self.logger.debug("Using {} action space for agent_{}".format(env_settings.action_space[idx], idx)) - + action_spaces_values = {"multi_discrete": gym.spaces.MultiDiscrete(self.n_actions), + "discrete": gym.spaces.Discrete(self.n_actions[0] + self.n_actions[1] - 1)} + action_space_dict = self._update_dict(action_spaces_values) + self.logger.debug("Using the following action spaces: {}".format(action_space_dict)) self.action_space = gym.spaces.Dict(action_space_dict) - def _get_obs(self, response): - observation = {} - observation["frame"] = self._get_frame(response) - for idx in range(2): - generic_obs_dict, player_obs_dict = self._player_specific_ram_states_integration(response, idx) - observation["agent_{}".format(idx)] = player_obs_dict - observation.update(generic_obs_dict) - - return observation - # Return the no-op action def get_no_op_action(self): - no_op_action = {} - for idx in range(2): - if self.env_settings.action_space[idx] == "multi_discrete": - no_op_action["agent_{}".format(idx)] = [0, 0] - elif self.env_settings.action_space[idx] == "discrete": - no_op_action["agent_{}".format(idx)] = 0 - - return no_op_action + no_op_values = {"multi_discrete": [0, 0], + "discrete": 0} + return self._update_dict(no_op_values) # Step the environment def step(self, actions: Dict[str, Union[int, List[int]]]): @@ -342,4 +322,21 @@ def step(self, actions: Dict[str, Union[int, List[int]]]): response = self.arena_engine.step(self._last_action) observation = self._get_obs(response) - return observation, response.reward, response.info.game_states["game_done"], False, self._get_info(response) \ No newline at end of file + return observation, response.reward, response.info.game_states["game_done"], False, self._get_info(response) + + def _update_dict(self, values_dict): + out_dict = {} + for idx, action_space in enumerate(self.env_settings.action_space): + out_dict["agent_{}".format(idx)] = values_dict[action_space] + + return out_dict + + def _get_obs(self, response): + observation = {} + observation["frame"] = self._get_frame(response) + for idx in range(self.env_settings.n_players): + generic_obs_dict, player_obs_dict = self._player_specific_ram_states_integration(response, idx) + observation["agent_{}".format(idx)] = player_obs_dict + observation.update(generic_obs_dict) + + return observation \ No newline at end of file diff --git a/diambra/arena/env_settings.py b/diambra/arena/env_settings.py index 777683b3..686a1329 100644 --- a/diambra/arena/env_settings.py +++ b/diambra/arena/env_settings.py @@ -23,6 +23,7 @@ def check_val_in_list(key, value, valid_list): @dataclass class EnvironmentSettings: + """Generic Environment Settings Class""" env_info = None games_dict = None @@ -183,7 +184,7 @@ def _process_random_values(self): @dataclass class EnvironmentSettings1P(EnvironmentSettings): - # Player level + """Single Agent Environment Settings Class""" role: str = "Random" characters: Union[str, Tuple[str], Tuple[str, str], Tuple[str, str, str]] = ("Random", "Random", "Random") outfits: int = 1 @@ -255,7 +256,7 @@ def _get_player_specific_values(self): @dataclass class EnvironmentSettings2P(EnvironmentSettings): - # Player level + """Single Agent Environment Settings Class""" role: Tuple[str, str] = ("Random", "Random") characters: Union[Tuple[str, str], Tuple[Tuple[str], Tuple[str]], Tuple[Tuple[str, str], Tuple[str, str]], @@ -299,14 +300,14 @@ def _process_random_values(self): super()._process_random_values() characters_tmp = [[],[]] - for idx in range(2): - sampled_characters = self._sample_characters() + for idx, characters in enumerate(self.characters): + sampled_characters = self._sample_characters() for jdx in range(3): - if self.characters[idx][jdx] == "Random": + if characters[jdx] == "Random": characters_tmp[idx].append(sampled_characters[jdx]) else: - characters_tmp[idx].append(self.characters[idx][jdx]) + characters_tmp[idx].append(characters[jdx]) self.characters = (tuple(characters_tmp[0]), tuple(characters_tmp[1])) @@ -324,8 +325,8 @@ def _process_random_values(self): self.ultimate_style = tuple([[random.choice(list(range(1, 3))) if self.ultimate_style[idx][jdx] == "Random" else self.ultimate_style[idx][jdx] for jdx in range(3)] for idx in range(2)]) def _get_action_spaces(self): - action_spaces = [model.EnvSettings.ActionSpace.ACTION_SPACE_DISCRETE if self.action_space[idx] == "discrete" else \ - model.EnvSettings.ActionSpace.ACTION_SPACE_MULTI_DISCRETE for idx in range(2)] + action_spaces = [model.EnvSettings.ActionSpace.ACTION_SPACE_DISCRETE if action_space == "discrete" else \ + model.EnvSettings.ActionSpace.ACTION_SPACE_MULTI_DISCRETE for action_space in self.action_space] return action_spaces diff --git a/diambra/arena/utils/controller.py b/diambra/arena/utils/controller.py index a7cb16d0..d0d63a65 100644 --- a/diambra/arena/utils/controller.py +++ b/diambra/arena/utils/controller.py @@ -5,6 +5,7 @@ from inputs import devices import pickle from os.path import expanduser +import logging home_dir = expanduser("~") CONFIG_FILE_PATH = os.path.join(home_dir, '.diambra/config/deviceConfig.cfg') @@ -66,9 +67,11 @@ class DiambraDevice(Thread): # def class type thread def __init__(self, device, action_list=(("NoMove", "Left", "UpLeft", "Up", "UpRight", "Right", "DownRight", "Down", "DownLeft"), ("But0", "But1", "But2", "But3", "But4", "But5", "But6", "But7", "But8")), cfg=["But1", "But2", "But3", "But4", "But5", "But6", "But7", "But8"], - force_configure=False, skip_configure=False): + force_configure=False, skip_configure=False, logging_level=logging.INFO): # thread init class (don't forget this) Thread.__init__(self, daemon=True) + self.logger = logging.getLogger(__name__) + self.logger.basicConfig(logging_level) self.stop_event = Event() @@ -117,7 +120,7 @@ def __init__(self, device, action_list=(("NoMove", "Left", "UpLeft", "Up", "UpRi ans = input("Want to reconfigure the device? (y/n): ") if ans == "y": - print("Restarting device configuration") + self.logger.info("Restarting device configuration") # Run device configuration self.configure() else: @@ -127,12 +130,11 @@ def __init__(self, device, action_list=(("NoMove", "Left", "UpLeft", "Up", "UpRi self.save_device_configuration() - print("Diambra device initialized on device {} [{}]".format(self.device.name, self.device_id)) + self.logger.info("Diambra device initialized on device {} [{}]".format(self.device.name, self.device_id)) # Show device events def show_device_events(self, event_codes_to_skip=[], event_code_to_show=None): - - print("Use device to see events") + self.logger.info("Use device to see events") while True: for event in self.device.read(): if event.ev_type != "Sync" and event.ev_type != "Misc": @@ -140,7 +142,7 @@ def show_device_events(self, event_codes_to_skip=[], event_code_to_show=None): if event_code_to_show is not None: if event.code != event_code_to_show: continue - print("Event type: {}, event code: {}, event state: {}".format(event.ev_type, event.code, event.state)) + self.logger.info("Event type: {}, event code: {}, event state: {}".format(event.ev_type, event.code, event.state)) # Prepare device config dict to be saved def process_device_dict_for_save(self): @@ -149,7 +151,7 @@ def process_device_dict_for_save(self): # Save device configuration def save_device_configuration(self): - print("Saving configuration in {}".format(self.device_config_file_path)) + self.logger.info("Saving configuration in {}".format(self.device_config_file_path)) # Convert device config dictionary cfg_dict_to_save = self.process_device_dict_for_save() @@ -173,12 +175,11 @@ def save_device_configuration(self): # Load all devices configuration def load_all_device_configurations(self): - if os.path.exists(self.device_config_file_path): with open(self.device_config_file_path, "rb") as cfg_file: cfg_file_dict_list = pickle.load(cfg_file) else: - print("No device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) + self.logger.info("No device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) os.makedirs(os.path.dirname(self.device_config_file_path), exist_ok=True) cfg_file_dict_list = [] @@ -194,8 +195,7 @@ def configure(self): # Configuration Test def config_test(self): - - print("Press Start to end configuration test") + self.logger.info("Press Start to end configuration test") # Execute run function in thread mode thread = Thread(target=self.run, args=()) @@ -203,19 +203,17 @@ def config_test(self): thread.start() while True: - actions = self.get_all_actions() if actions[0] != 0: - print("Move action = {}. (Press START to end configuration test).".format(self.all_actions_list[0][actions[0]])) + self.logger.info("Move action = {}. (Press START to end configuration test).".format(self.all_actions_list[0][actions[0]])) if actions[1] != 0: - print("Attack action = {}. (Press START to end configuration test).".format(self.all_actions_list[1][actions[1]])) + self.logger.info("Attack action = {}. (Press START to end configuration test).".format(self.all_actions_list[1][actions[1]])) if actions[2] != 0: break # Creating hash dictionary def compose_hash_dict(self, dictionary, hash_elem): - key_val_list = [] key_val = "" @@ -381,40 +379,40 @@ def load_device_configuration(self): self.device_dict["Arrow"][item[0]] = item[1] config_found = True - print("Device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) - print("Device configuration file loaded.") + self.logger.info("Device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) + self.logger.info("Device configuration file loaded.") except: - print("Invalid device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) + self.logger.info("Invalid device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) if not config_found: - print("Configuration for this device not present in device configuration file") + self.logger.info("Configuration for this device not present in device configuration file") return config_found # Configure device buttons def configure(self): - print("") - print("") - print("Configuring device {}".format(self.device)) - print("") - print("# Buttons CFG file") - print(" _______ ") - print(" B7 __|digital|__ B8 ") - print(" B5 |buttons| B6 ") - print(" / \ ") - print(" SELECT START ") - print(" UP B1 ") - print(" | ") - print(" LEFT-- --RIGHT B4 B2") - print(" | ") - print(" DOWN B3 ") - print(" __/____ ") - print(" |digital| ") - print(" | move | ") - print(" ------- ") - print("") - print("") + self.logger.info("") + self.logger.info("") + self.logger.info("Configuring device {}".format(self.device)) + self.logger.info("") + self.logger.info("# Buttons CFG file") + self.logger.info(" _______ ") + self.logger.info(" B7 __|digital|__ B8 ") + self.logger.info(" B5 |buttons| B6 ") + self.logger.info(" / \ ") + self.logger.info(" SELECT START ") + self.logger.info(" UP B1 ") + self.logger.info(" | ") + self.logger.info(" LEFT-- --RIGHT B4 B2") + self.logger.info(" | ") + self.logger.info(" DOWN B3 ") + self.logger.info(" __/____ ") + self.logger.info(" |digital| ") + self.logger.info(" | move | ") + self.logger.info(" ------- ") + self.logger.info("") + self.logger.info("") self.code_to_group_map = defaultdict(lambda: "") self.device_dict = {} @@ -423,8 +421,8 @@ def configure(self): # Buttons configuration # Start and Select - print("Return/Enter key is not-allowed and would cause the program to stop.") - print("Press START button") + self.logger.info("Return/Enter key is not-allowed and would cause the program to stop.") + self.logger.info("Press START button") but_not_set = True start_set = False while but_not_set: @@ -435,13 +433,13 @@ def configure(self): raise Exception("Return/Enter not-allowed, aborting.") self.start_code = event.code start_set = True - print("Start associated with {}".format(event.code)) + self.logger.info("Start associated with {}".format(event.code)) else: if start_set == True: but_not_set = False break - print("Press SELECT button (Start to skip)") + self.logger.info("Press SELECT button (Start to skip)") but_not_set = True while but_not_set: for event in self.device.read(): @@ -450,11 +448,11 @@ def configure(self): if (event.code == "KEY_ENTER"): raise Exception("Return/Enter not-allowed, aborting.") self.select_code = event.code - print("Select associated with {}".format(event.code)) + self.logger.info("Select associated with {}".format(event.code)) else: but_not_set = False if event.code == self.start_code: - print("Select association skipped") + self.logger.info("Select association skipped") break # Attack buttons @@ -464,7 +462,7 @@ def configure(self): if end_flag: break - print("Press B{} button (SELECT / START to end configuration)".format(idx+1)) + self.logger.info("Press B{} button (SELECT / START to end configuration)".format(idx+1)) but_not_set = True @@ -477,20 +475,20 @@ def configure(self): if event.ev_type == "Key": if (event.code != self.start_code and event.code != self.select_code): if event.state > 0: - print("Button B{}, event code = {}".format(idx+1, event.code)) + self.logger.info("Button B{}, event code = {}".format(idx+1, event.code)) self.device_dict["Key"][event.code] = idx self.code_to_group_map[event.code] = "Key" elif event.state == 0: but_not_set = False else: if event.state == 0: - print("Remaining buttons configuration skipped") + self.logger.info("Remaining buttons configuration skipped") end_flag = True break # Moves end_flag = False - print("Configuring moves") + self.logger.info("Configuring moves") moves_list = ["UP", "RIGHT", "DOWN", "LEFT"] for idx, move in enumerate(moves_list): @@ -498,7 +496,7 @@ def configure(self): if end_flag: break - print("Press {} arrow (SELECT / START to skip)".format(move)) + self.logger.info("Press {} arrow (SELECT / START to skip)".format(move)) but_not_set = True @@ -511,20 +509,20 @@ def configure(self): if event.ev_type == "Key": if (event.code != self.start_code and event.code != self.select_code): if event.state > 0: - print("Move {}, event code = {}".format(move, event.code)) + self.logger.info("Move {}, event code = {}".format(move, event.code)) self.device_dict["Arrow"][event.code] = idx self.code_to_group_map[event.code] = "Arrow" elif event.state == 0: but_not_set = False else: if event.state == 0: - print("Remaining buttons configuration skipped") + self.logger.info("Remaining buttons configuration skipped") end_flag = True break - print("Device dict : ") - print("Buttons (Keys) dict : ", self.device_dict["Key"]) - print("Arrows (Keys) dict : ", self.device_dict["Arrow"]) + self.logger.info("Device dict : ") + self.logger.info("Buttons (Keys) dict : ", self.device_dict["Key"]) + self.logger.info("Arrows (Keys) dict : ", self.device_dict["Arrow"]) input("Configuration completed, press Enter to continue.") @@ -651,40 +649,40 @@ def load_device_configuration(self): item[1], item[2], item[3], item[4]] config_found = True - print("Device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) - print("Device configuration file loaded.") + self.logger.info("Device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) + self.logger.info("Device configuration file loaded.") except: - print("Invalid device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) + self.logger.info("Invalid device configuration file found in: {}".format(os.path.dirname(self.device_config_file_path))) if not config_found: - print("Configuration for this device not present in device configuration file") + self.logger.info("Configuration for this device not present in device configuration file") return config_found # Configure device buttons def configure(self): - print("") - print("") - print("Configuring device {}".format(self.device)) - print("") - print("# Buttons CFG file") - print(" _______ ") - print(" B7 __|digital|__ B8 ") - print(" B5 |buttons| B6 ") - print(" / \ ") - print(" B1 ") - print(" | SELECT START ") - print(" -- -- B4 B2") - print(" | - ") - print(" __/____ ( + ) B3 ") - print("|digital| - ") - print("| move | \______ ") - print(" ------- |analog| ") - print(" | move | ") - print(" ------ ") - print("") - print("NB: Be sure to have your analog switch on before starting.") - print("") + self.logger.info("") + self.logger.info("") + self.logger.info("Configuring device {}".format(self.device)) + self.logger.info("") + self.logger.info("# Buttons CFG file") + self.logger.info(" _______ ") + self.logger.info(" B7 __|digital|__ B8 ") + self.logger.info(" B5 |buttons| B6 ") + self.logger.info(" / \ ") + self.logger.info(" B1 ") + self.logger.info(" | SELECT START ") + self.logger.info(" -- -- B4 B2") + self.logger.info(" | - ") + self.logger.info(" __/____ ( + ) B3 ") + self.logger.info("|digital| - ") + self.logger.info("| move | \______ ") + self.logger.info(" ------- |analog| ") + self.logger.info(" | move | ") + self.logger.info(" ------ ") + self.logger.info("") + self.logger.info("NB: Be sure to have your analog switch on before starting.") + self.logger.info("") self.device_dict = {} self.device_dict["Key"] = defaultdict(lambda: 7) @@ -692,30 +690,30 @@ def configure(self): # Buttons configuration # Start and Select - print("Press START button") + self.logger.info("Press START button") but_not_set = True while but_not_set: for event in self.device.read(): if event.ev_type == "Key": if event.state == 1: self.start_code = event.code - print("Start associated with {}".format(event.code)) + self.logger.info("Start associated with {}".format(event.code)) else: but_not_set = False break - print("Press SELECT button (Start to skip)") + self.logger.info("Press SELECT button (Start to skip)") but_not_set = True while but_not_set: for event in self.device.read(): if event.ev_type == "Key": if event.code != self.start_code and event.state == 1: self.select_code = event.code - print("Select associated with {}".format(event.code)) + self.logger.info("Select associated with {}".format(event.code)) else: but_not_set = False if event.code == self.start_code: - print("Select association skipped") + self.logger.info("Select association skipped") break # Attack buttons @@ -725,7 +723,7 @@ def configure(self): if end_flag: break - print("Press B{} button (SELECT / START to end configuration)".format(idx+1)) + self.logger.info("Press B{} button (SELECT / START to end configuration)".format(idx+1)) but_not_set = True @@ -738,20 +736,20 @@ def configure(self): if event.ev_type == "Key": if (event.code != self.start_code and event.code != self.select_code): if event.state == 1: - print("Button B{}, event code = {}".format(idx+1, event.code)) + self.logger.info("Button B{}, event code = {}".format(idx+1, event.code)) self.device_dict["Key"][event.code] = idx elif event.state == 0: but_not_set = False else: if event.state == 0: - print("Remaining buttons configuration skipped") + self.logger.info("Remaining buttons configuration skipped") end_flag = True break # Move sticks # Digital end_flag = False - print("Configuring digital move") + self.logger.info("Configuring digital move") moves_list = ["UP", "RIGHT", "DOWN", "LEFT"] event_codes_list = ["Y", "X", "Y", "X"] self.device_dict["Absolute"]["ABS_HAT0Y"] = defaultdict(lambda: [ @@ -766,7 +764,7 @@ def configure(self): if end_flag: break - print("Press {} arrow (SELECT / START to skip)".format(move)) + self.logger.info("Press {} arrow (SELECT / START to skip)".format(move)) but_not_set = True @@ -780,16 +778,16 @@ def configure(self): if event.ev_type == "Absolute": if event.code == "ABS_HAT0" + event_codes_list[idx]: if abs(event.state) == 1: - print("{} move event code = {}, event state = {}".format( + self.logger.info("{} move event code = {}, event state = {}".format( move, event.code, event.state)) self.device_dict["Absolute"][event.code][event.state] = [ idx, abs(event.state)] elif event.state == 0: but_not_set = False else: - print("Digital Move Stick assumes not admissible values: {}".format( + self.logger.info("Digital Move Stick assumes not admissible values: {}".format( event.state)) - print( + self.logger.info( "Digital Move Stick not supported, configuration skipped") end_flag = True break @@ -797,13 +795,13 @@ def configure(self): if (event.code == self.start_code or event.code == self.select_code): if event.state == 0: - print("Digital Move Stick configuration skipped") + self.logger.info("Digital Move Stick configuration skipped") end_flag = True break # Move sticks # Analog - print("Configuring analog move") + self.logger.info("Configuring analog move") moves_list = ["UP", "RIGHT", "DOWN", "LEFT"] event_codes_list = ["Y", "X", "Y", "X"] self.max_analog_val = {} @@ -811,8 +809,7 @@ def configure(self): for idx, move in enumerate(moves_list): - print( - "Move left analog in {} position, keep it there and press Start".format(move)) + self.logger.info("Move left analog in {} position, keep it there and press Start".format(move)) but_not_set = True @@ -846,17 +843,17 @@ def configure(self): self.origin_analog_val[moves_list[idx]]) * thresh_perc self.delta_perc[idx+2] = -self.delta_perc[idx] - print("Delta perc = ", self.delta_perc) + self.logger.info("Delta perc = ", self.delta_perc) # Addressing Y-X axis for idx in range(2): - print("{} move event code = ABS_{}, ".format(moves_list[idx], + self.logger.info("{} move event code = ABS_{}, ".format(moves_list[idx], event_codes_list[idx]) + "event state = {}".format(self.max_analog_val[moves_list[idx]])) - print("{} move event code = ABS_{}, ".format(moves_list[idx+2], + self.logger.info("{} move event code = ABS_{}, ".format(moves_list[idx+2], event_codes_list[idx+2]) + " event state = {}".format(self.max_analog_val[moves_list[idx+2]])) - print("NO {}-{} move event code =".format(moves_list[idx], + self.logger.info("NO {}-{} move event code =".format(moves_list[idx], moves_list[idx+2]) + " ABS_{}, ".format(event_codes_list[idx]) + "event state = {}".format(self.origin_analog_val[moves_list[idx]])) @@ -875,14 +872,13 @@ def configure(self): [[idx], 1], [[idx, idx+2], 0], [[idx+2], 1]] else: - print( - "Not admissible values found in analog stick configuration, skipping") + self.logger.info("Not admissible values found in analog stick configuration, skipping") - print("device dict : ") - print("Buttons (Keys) dict : ", self.device_dict["Key"]) - print("Moves (Absolute) dict : ", self.device_dict["Absolute"]) + self.logger.info("device dict : ") + self.logger.info("Buttons (Keys) dict : ", self.device_dict["Key"]) + self.logger.info("Moves (Absolute) dict : ", self.device_dict["Absolute"]) - print("Configuration completed.") + self.logger.info("Configuration completed.") return @@ -931,7 +927,6 @@ def run(self): # run is a default Thread function if __name__ == "__main__": - print("\nWhat do you want to do:") print(" 1 - Show device events") print(" 2 - Configure device") diff --git a/setup.py b/setup.py index 3f344618..46287f9a 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,8 @@ 'core': [], 'tests': ['pytest', 'pytest-mock', 'testresources'], 'stable-baselines': ['stable-baselines==2.10.2', 'gym<=0.21.0', "protobuf==3.20.1", "pyyaml"], - 'stable-baselines3': ['stable-baselines3[extra]==2.1.0', "pyyaml"], - 'ray-rllib': ['ray[rllib]==2.6.3', 'tensorflow', 'torch', "pyyaml"], + 'stable-baselines3': ['stable-baselines3[extra]==2.1.*', "pyyaml"], + 'ray-rllib': ['ray[rllib]==2.6.*', 'tensorflow', 'torch', "pyyaml"], } # NOTE Package data is inside MANIFEST.In