From b330620510f0dcb1d8ed19377d13fb2ac9442940 Mon Sep 17 00:00:00 2001 From: satos Date: Thu, 5 Sep 2024 17:36:24 +0900 Subject: [PATCH 1/4] introduce ruff formatter and linter --- .github/workflows/ruff.yml | 13 +++++++++++++ .ruff.toml | 5 +++++ README.md | 3 +++ requirements.txt | 1 + 4 files changed, 22 insertions(+) create mode 100644 .github/workflows/ruff.yml create mode 100644 .ruff.toml diff --git a/.github/workflows/ruff.yml b/.github/workflows/ruff.yml new file mode 100644 index 0000000..f5615d2 --- /dev/null +++ b/.github/workflows/ruff.yml @@ -0,0 +1,13 @@ +name: Ruff +on: [push, pull_request] +jobs: + ruff: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + # run ruff format + - uses: chartboost/ruff-action@v1 + with: + args: 'format --check' + # run ruff check + - uses: chartboost/ruff-action@v1 diff --git a/.ruff.toml b/.ruff.toml new file mode 100644 index 0000000..74e9be0 --- /dev/null +++ b/.ruff.toml @@ -0,0 +1,5 @@ +exclude = [ + "AALpy/", + "spot-*/", + ".venv/", +] diff --git a/README.md b/README.md index d93b319..dfc119d 100644 --- a/README.md +++ b/README.md @@ -152,5 +152,8 @@ If you want to run ProbBBC without strategy-guided equivalence testing (for exam python3 src/main.py --model-file benchmarks/mqtt/mqtt.dot --prop-file benchmarks/mqtt/mqtt.props --prism-path /usr/bin/prism --output-dir results --min-rounds 100 --max-rounds 120 --save-files-for-each-round --target-unambiguity 0.99 ``` +### Formatter +Please run `ruff check && ruff format` and apply the changes before making commit. + ### License This software is released under the BSD-2 License. See LICENSE file for details. diff --git a/requirements.txt b/requirements.txt index 20ab45a..b34ce12 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ numpy==1.24.2 pydot==1.4.2 pyparsing==3.0.9 scipy==1.10.1 +ruff==0.6.3 \ No newline at end of file From 800ac942b98aee3f4508253db17aa0d95ff385d1 Mon Sep 17 00:00:00 2001 From: satos Date: Thu, 5 Sep 2024 17:39:40 +0900 Subject: [PATCH 2/4] apply ruff format --- .../grid_world/grid_world.py | 354 +++++++++++---- misc/MV_python_integrator.py | 258 +++++------ misc/dfa_lean.py | 19 +- misc/learn_then_model_check.py | 71 ++- misc/mdp_lean.py | 32 +- misc/mdp_model_check.py | 74 +++- misc/smc_uniform.py | 78 ++-- misc/weired_coffee.py | 24 +- src/PrismModelConverter.py | 13 +- src/ProbBlackBoxChecking.py | 411 +++++++++++++----- src/Smc.py | 55 ++- src/StrategyBridge.py | 39 +- src/eval_each_round.py | 58 ++- src/main.py | 153 +++++-- src/prism_export_to_dot_model.py | 30 +- src/tests/Smc_test.py | 45 +- src/tests/StrategyBridge_test.py | 108 ++--- stat/scripts/plot_graph.py | 168 ++++--- 18 files changed, 1363 insertions(+), 627 deletions(-) diff --git a/benchmarks/random_grid_world/grid_world/grid_world.py b/benchmarks/random_grid_world/grid_world/grid_world.py index 0ed99b2..fc1a5ac 100644 --- a/benchmarks/random_grid_world/grid_world/grid_world.py +++ b/benchmarks/random_grid_world/grid_world/grid_world.py @@ -5,19 +5,19 @@ class Actions(IntEnum): - North = 0, # Moving up on the grid - South = 1, # Moving down on the grid - West = 2, # Moving left on the grid + North = (0,) # Moving up on the grid + South = (1,) # Moving down on the grid + West = (2,) # Moving left on the grid East = 3 # Moving right on the grid class Observations(IntEnum): - Concrete = 0, # Represents a normal, navigable cell - Hole = 1, # Represents a hole cell where movement stops - Wall = 2, # Represents an encountered wall, i.e., a border of the grid - Goal = 3, # Represents a goal cell - Mud = 4, - Grass = 5, + Concrete = (0,) # Represents a normal, navigable cell + Hole = (1,) # Represents a hole cell where movement stops + Wall = (2,) # Represents an encountered wall, i.e., a border of the grid + Goal = (3,) # Represents a goal cell + Mud = (4,) + Grass = (5,) Sand = 6 @@ -63,8 +63,17 @@ class GridWorld: Class to represent a grid world environment """ - def __init__(self, x_size: int, y_size: int, hole_ratio: float = 0.1, num_goal: int = 1, mud_ratio: float = 0.2, - grass_ratio: float = 0.2, sand_ratio: float = 0.2, seed: Optional[int] = None): + def __init__( + self, + x_size: int, + y_size: int, + hole_ratio: float = 0.1, + num_goal: int = 1, + mud_ratio: float = 0.2, + grass_ratio: float = 0.2, + sand_ratio: float = 0.2, + seed: Optional[int] = None, + ): """ Randomly construct a grid world. @@ -90,7 +99,7 @@ def __init__(self, x_size: int, y_size: int, hole_ratio: float = 0.1, num_goal: Observations.Goal: 1.0, Observations.Mud: 0.6, Observations.Grass: 0.8, - Observations.Sand: 0.75 + Observations.Sand: 0.75, } self.mud = [] self.grass = [] @@ -99,17 +108,29 @@ def __init__(self, x_size: int, y_size: int, hole_ratio: float = 0.1, num_goal: # Randomly decide holes. The initial state should not be a hole self.holes = [] for _ in range(int(x_size * y_size * hole_ratio)): - x_hole, y_hole = random.randint(0, x_size - 1), random.randint(0, y_size - 1) - if (x_hole, y_hole) not in self.holes and (x_hole, y_hole) != (self.x_init, self.y_init): + x_hole, y_hole = ( + random.randint(0, x_size - 1), + random.randint(0, y_size - 1), + ) + if (x_hole, y_hole) not in self.holes and (x_hole, y_hole) != ( + self.x_init, + self.y_init, + ): self.holes.append((x_hole, y_hole)) # Randomly decide goals. Any goal must not be a hole. The initial state should not be a hole, either. self.goals = [] while len(self.goals) < num_goal: while True: - x_goal, y_goal = random.randint(0, x_size - 1), random.randint(0, y_size - 1) - if (x_goal, y_goal) not in self.goals and (x_goal, y_goal) not in self.holes and (x_goal, y_goal) != ( - self.x_init, self.y_init): + x_goal, y_goal = ( + random.randint(0, x_size - 1), + random.randint(0, y_size - 1), + ) + if ( + (x_goal, y_goal) not in self.goals + and (x_goal, y_goal) not in self.holes + and (x_goal, y_goal) != (self.x_init, self.y_init) + ): self.goals.append((x_goal, y_goal)) break @@ -117,20 +138,51 @@ def __init__(self, x_size: int, y_size: int, hole_ratio: float = 0.1, num_goal: # either. A state next to a mud state should not be a mud state for _ in range(int(x_size * y_size * mud_ratio)): while True: - x_mud, y_mud = random.randint(0, x_size - 1), random.randint(0, y_size - 1) - if (x_mud, y_mud) not in self.holes and (x_mud, y_mud) not in self.goals and (x_mud, y_mud) != ( - self.x_init, self.y_init) and self.to_observation(x_mud, y_mud) == Observations.Concrete: - if self.noise_probability[self.to_observation(x_mud - 1, y_mud)] < 1.0: - if (x_mud - 1, y_mud) in self.mud or (x_mud - 2, y_mud) in self.mud: + x_mud, y_mud = ( + random.randint(0, x_size - 1), + random.randint(0, y_size - 1), + ) + if ( + (x_mud, y_mud) not in self.holes + and (x_mud, y_mud) not in self.goals + and (x_mud, y_mud) != (self.x_init, self.y_init) + and self.to_observation(x_mud, y_mud) == Observations.Concrete + ): + if ( + self.noise_probability[self.to_observation(x_mud - 1, y_mud)] + < 1.0 + ): + if (x_mud - 1, y_mud) in self.mud or ( + x_mud - 2, + y_mud, + ) in self.mud: break - if self.noise_probability[self.to_observation(x_mud + 1, y_mud)] < 1.0: - if (x_mud + 1, y_mud) in self.mud or (x_mud + 2, y_mud) in self.mud: + if ( + self.noise_probability[self.to_observation(x_mud + 1, y_mud)] + < 1.0 + ): + if (x_mud + 1, y_mud) in self.mud or ( + x_mud + 2, + y_mud, + ) in self.mud: break - if self.noise_probability[self.to_observation(x_mud, y_mud - 1)] < 1.0: - if (x_mud, y_mud - 1) in self.mud or (x_mud, y_mud - 2) in self.mud: + if ( + self.noise_probability[self.to_observation(x_mud, y_mud - 1)] + < 1.0 + ): + if (x_mud, y_mud - 1) in self.mud or ( + x_mud, + y_mud - 2, + ) in self.mud: break - if self.noise_probability[self.to_observation(x_mud, y_mud + 1)] < 1.0: - if (x_mud, y_mud + 1) in self.mud or (x_mud, y_mud + 2) in self.mud: + if ( + self.noise_probability[self.to_observation(x_mud, y_mud + 1)] + < 1.0 + ): + if (x_mud, y_mud + 1) in self.mud or ( + x_mud, + y_mud + 2, + ) in self.mud: break self.mud.append((x_mud, y_mud)) break @@ -139,21 +191,59 @@ def __init__(self, x_size: int, y_size: int, hole_ratio: float = 0.1, num_goal: # either. A state next to a grass state should not be a grass state for _ in range(int(x_size * y_size * grass_ratio)): while True: - x_grass, y_grass = random.randint(0, x_size - 1), random.randint(0, y_size - 1) - if (x_grass, y_grass) not in self.holes and (x_grass, y_grass) not in self.goals and ( - x_grass, y_grass) != (self.x_init, self.y_init) and self.to_observation(x_grass, y_grass) == \ - Observations.Concrete: - if self.noise_probability[self.to_observation(x_grass - 1, y_grass)] < 1.0: - if (x_grass - 1, y_grass) in self.grass or (x_grass - 2, y_grass) in self.grass: + x_grass, y_grass = ( + random.randint(0, x_size - 1), + random.randint(0, y_size - 1), + ) + if ( + (x_grass, y_grass) not in self.holes + and (x_grass, y_grass) not in self.goals + and (x_grass, y_grass) != (self.x_init, self.y_init) + and self.to_observation(x_grass, y_grass) == Observations.Concrete + ): + if ( + self.noise_probability[ + self.to_observation(x_grass - 1, y_grass) + ] + < 1.0 + ): + if (x_grass - 1, y_grass) in self.grass or ( + x_grass - 2, + y_grass, + ) in self.grass: break - if self.noise_probability[self.to_observation(x_grass + 1, y_grass)] < 1.0: - if (x_grass + 1, y_grass) in self.grass or (x_grass + 2, y_grass) in self.grass: + if ( + self.noise_probability[ + self.to_observation(x_grass + 1, y_grass) + ] + < 1.0 + ): + if (x_grass + 1, y_grass) in self.grass or ( + x_grass + 2, + y_grass, + ) in self.grass: break - if self.noise_probability[self.to_observation(x_grass, y_grass - 1)] < 1.0: - if (x_grass, y_grass - 1) in self.grass or (x_grass, y_grass - 2) in self.grass: + if ( + self.noise_probability[ + self.to_observation(x_grass, y_grass - 1) + ] + < 1.0 + ): + if (x_grass, y_grass - 1) in self.grass or ( + x_grass, + y_grass - 2, + ) in self.grass: break - if self.noise_probability[self.to_observation(x_grass, y_grass + 1)] < 1.0: - if (x_grass, y_grass + 1) in self.grass or (x_grass, y_grass + 2) in self.grass: + if ( + self.noise_probability[ + self.to_observation(x_grass, y_grass + 1) + ] + < 1.0 + ): + if (x_grass, y_grass + 1) in self.grass or ( + x_grass, + y_grass + 2, + ) in self.grass: break self.grass.append((x_grass, y_grass)) break @@ -162,21 +252,51 @@ def __init__(self, x_size: int, y_size: int, hole_ratio: float = 0.1, num_goal: # either. A state next to a sand state should not be a sand state for _ in range(int(x_size * y_size * sand_ratio)): while True: - x_sand, y_sand = random.randint(0, x_size - 1), random.randint(0, y_size - 1) - if (x_sand, y_sand) not in self.holes and (x_sand, y_sand) not in self.goals and ( - x_sand, y_sand) != (self.x_init, self.y_init) and self.to_observation(x_sand, y_sand) == \ - Observations.Concrete: - if self.noise_probability[self.to_observation(x_sand - 1, y_sand)] < 1.0: - if (x_sand - 1, y_sand) in self.sand or (x_sand - 2, y_sand) in self.sand: + x_sand, y_sand = ( + random.randint(0, x_size - 1), + random.randint(0, y_size - 1), + ) + if ( + (x_sand, y_sand) not in self.holes + and (x_sand, y_sand) not in self.goals + and (x_sand, y_sand) != (self.x_init, self.y_init) + and self.to_observation(x_sand, y_sand) == Observations.Concrete + ): + if ( + self.noise_probability[self.to_observation(x_sand - 1, y_sand)] + < 1.0 + ): + if (x_sand - 1, y_sand) in self.sand or ( + x_sand - 2, + y_sand, + ) in self.sand: break - if self.noise_probability[self.to_observation(x_sand + 1, y_sand)] < 1.0: - if (x_sand + 1, y_sand) in self.sand or (x_sand + 2, y_sand) in self.sand: + if ( + self.noise_probability[self.to_observation(x_sand + 1, y_sand)] + < 1.0 + ): + if (x_sand + 1, y_sand) in self.sand or ( + x_sand + 2, + y_sand, + ) in self.sand: break - if self.noise_probability[self.to_observation(x_sand, y_sand - 1)] < 1.0: - if (x_sand, y_sand - 1) in self.sand or (x_sand, y_sand - 2) in self.sand: + if ( + self.noise_probability[self.to_observation(x_sand, y_sand - 1)] + < 1.0 + ): + if (x_sand, y_sand - 1) in self.sand or ( + x_sand, + y_sand - 2, + ) in self.sand: break - if self.noise_probability[self.to_observation(x_sand, y_sand + 1)] < 1.0: - if (x_sand, y_sand + 1) in self.sand or (x_sand, y_sand + 2) in self.sand: + if ( + self.noise_probability[self.to_observation(x_sand, y_sand + 1)] + < 1.0 + ): + if (x_sand, y_sand + 1) in self.sand or ( + x_sand, + y_sand + 2, + ) in self.sand: break self.sand.append((x_sand, y_sand)) break @@ -193,11 +313,11 @@ def to_prism(self) -> str: def to_header(self) -> str: # Function to generate the PRISM code header, which includes state variables and their initialization - result = 'mdp\n' - result += 'module random_grid_world\n' - result += f' x : [0..{self.x_size}] init {self.x_init};\n' - result += f' y : [0..{self.y_size}] init {self.y_init};\n' - result += ' output : [0..6] init 0;\n' + result = "mdp\n" + result += "module random_grid_world\n" + result += f" x : [0..{self.x_size}] init {self.x_init};\n" + result += f" y : [0..{self.y_size}] init {self.y_init};\n" + result += " output : [0..6] init 0;\n" return result def to_state(self, x: int, y: int) -> str: @@ -212,9 +332,13 @@ def to_state(self, x: int, y: int) -> str: for action in Actions: transitions = self.make_next(x, y, action) transitions_str = " + ".join( - [next_str(prob, next_x, next_y, obs) for prob, next_x, next_y, obs in transitions]) + [ + next_str(prob, next_x, next_y, obs) + for prob, next_x, next_y, obs in transitions + ] + ) lines.append(f" [{action.name}] (x={x}) & (y={y}) -> {transitions_str};") - return "\n".join(lines) + '\n\n' + return "\n".join(lines) + "\n\n" def to_observation(self, x: int, y: int) -> Observations: if (x, y) in self.holes: @@ -231,7 +355,9 @@ def to_observation(self, x: int, y: int) -> Observations: observation = Observations.Concrete return observation - def make_next(self, x: int, y: int, action: Actions) -> List[Tuple[float, int, int, Observations]]: + def make_next( + self, x: int, y: int, action: Actions + ) -> List[Tuple[float, int, int, Observations]]: """ Generate the possible next states given current position (x, y) and action. @@ -256,16 +382,33 @@ def make_next(self, x: int, y: int, action: Actions) -> List[Tuple[float, int, i the transition probability, next_x, next_y, and the corresponding observation """ if (x, y) in self.holes or (x, y) in self.goals: - return [(1.0, x, y, Observations.Hole if (x, y) in self.holes else Observations.Goal)] + return [ + ( + 1.0, + x, + y, + Observations.Hole if (x, y) in self.holes else Observations.Goal, + ) + ] next_states = [] - action_to_delta = {Actions.North: (0, -1), Actions.South: (0, 1), Actions.West: (-1, 0), Actions.East: (1, 0)} + action_to_delta = { + Actions.North: (0, -1), + Actions.South: (0, 1), + Actions.West: (-1, 0), + Actions.East: (1, 0), + } dx, dy = action_to_delta[action] target_x, target_y = x + dx, y + dy # If the target state is a wall, stay in the current state - if target_x < 0 or target_x >= self.x_size or target_y < 0 or target_y >= self.y_size: + if ( + target_x < 0 + or target_x >= self.x_size + or target_y < 0 + or target_y >= self.y_size + ): next_states.append((1.0, x, y, Observations.Wall)) return next_states @@ -273,16 +416,26 @@ def make_next(self, x: int, y: int, action: Actions) -> List[Tuple[float, int, i # Define the list of target states and their corresponding observations targets = [(target_x, target_y)] - if (target_x, target_y) not in self.holes and (target_x, target_y) not in self.goals and\ - self.noise_probability[target_observation] < 1.0: + if ( + (target_x, target_y) not in self.holes + and (target_x, target_y) not in self.goals + and self.noise_probability[target_observation] < 1.0 + ): if self.to_observation(target_x - dx, target_y - dy) != target_observation: targets.append((target_x - dx, target_y - dy)) - if self.to_observation(target_x + dx, target_y + dy) != target_observation and self.to_observation( - target_x - dx, target_y - dy) != self.to_observation(target_x + dx, target_y + dy): + if self.to_observation( + target_x + dx, target_y + dy + ) != target_observation and self.to_observation( + target_x - dx, target_y - dy + ) != self.to_observation(target_x + dx, target_y + dy): targets.append((target_x + dx, target_y + dy)) # Filter out wall states from the list of targets - targets = [(tx, ty) for tx, ty in targets if 0 <= tx < self.x_size and 0 <= ty < self.y_size] + targets = [ + (tx, ty) + for tx, ty in targets + if 0 <= tx < self.x_size and 0 <= ty < self.y_size + ] for tx, ty in targets: # Determine the observation of the target state @@ -292,13 +445,15 @@ def make_next(self, x: int, y: int, action: Actions) -> List[Tuple[float, int, i if (tx, ty) == (target_x, target_y): probability = self.noise_probability[target_observation] else: - probability = (1.0 - self.noise_probability[target_observation]) / (len(targets) - 1) + probability = (1.0 - self.noise_probability[target_observation]) / ( + len(targets) - 1 + ) next_states.append((probability, tx, ty, observation)) return next_states - footer = 'endmodule\n' + footer = "endmodule\n" # The positions of holes holes: List[Tuple[int, int]] # The positions of goals @@ -310,20 +465,55 @@ def main(): The entry point for grid world generation. It prints the resulting grid world generated by the parameters given as arguments. """ - parser = argparse.ArgumentParser(description='Generate a grid world.') - parser.add_argument('--x_size', type=int, required=True, help='Number of columns in the grid world') - parser.add_argument('--y_size', type=int, required=True, help='Number of rows in the grid world') - parser.add_argument('--num_goal', type=int, default=1, help='Number of goals in the grid world') - parser.add_argument('--hole_ratio', type=float, default=0.1, - help='Ratio of the number of hole cells to the total number of cells in the grid world') - parser.add_argument('--mud_ratio', type=float, default=0.3, help='The ratio of the number of mud cells') - parser.add_argument('--grass_ratio', type=float, default=0.3, help='The ratio of the number of grass cells') - parser.add_argument('--sand_ratio', type=float, default=0.3, help='The ratio of the number of sand cells') - parser.add_argument('--seed', type=int, default=None, help='Random seed to ensure reproducibility') + parser = argparse.ArgumentParser(description="Generate a grid world.") + parser.add_argument( + "--x_size", type=int, required=True, help="Number of columns in the grid world" + ) + parser.add_argument( + "--y_size", type=int, required=True, help="Number of rows in the grid world" + ) + parser.add_argument( + "--num_goal", type=int, default=1, help="Number of goals in the grid world" + ) + parser.add_argument( + "--hole_ratio", + type=float, + default=0.1, + help="Ratio of the number of hole cells to the total number of cells in the grid world", + ) + parser.add_argument( + "--mud_ratio", + type=float, + default=0.3, + help="The ratio of the number of mud cells", + ) + parser.add_argument( + "--grass_ratio", + type=float, + default=0.3, + help="The ratio of the number of grass cells", + ) + parser.add_argument( + "--sand_ratio", + type=float, + default=0.3, + help="The ratio of the number of sand cells", + ) + parser.add_argument( + "--seed", type=int, default=None, help="Random seed to ensure reproducibility" + ) args = parser.parse_args() - grid_world = GridWorld(args.x_size, args.y_size, args.hole_ratio, args.num_goal, args.mud_ratio, args.grass_ratio, - args.sand_ratio, args.seed) + grid_world = GridWorld( + args.x_size, + args.y_size, + args.hole_ratio, + args.num_goal, + args.mud_ratio, + args.grass_ratio, + args.sand_ratio, + args.seed, + ) print(grid_world.to_prism()) diff --git a/misc/MV_python_integrator.py b/misc/MV_python_integrator.py index 97467de..3227329 100644 --- a/misc/MV_python_integrator.py +++ b/misc/MV_python_integrator.py @@ -11,135 +11,149 @@ from aalpy.utils import load_automaton_from_file from StrategyBridge import StrategyBridge -socket_path = '/tmp/multivesta.sock' -example = 'shared_coin' +socket_path = "/tmp/multivesta.sock" +example = "shared_coin" + class MyModel(ABC): - def __init__(self): - print("MV_python_integrator setup\n") - mdp = load_automaton_from_file(f'/Users/bo40/workspace/python/AALpy/DotModels/MDPs/{example}.dot', automaton_type='mdp') - self.sul = MdpSUL(mdp) - # == デバッグ用 == - self.prism_model_path = f'/Users/bo40/workspace/python/mc_exp.prism' - self.prism_adv_path = f'/Users/bo40/workspace/python/adv.tra' - # ==== - self.strategy_bridge = StrategyBridge(self.prism_adv_path, self.prism_model_path) - self.number_of_steps = 0 - self.current_output = None - self.exec_trace = [] - self.cex_candidate = [] - self.prob_bbc_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.prob_bbc_socket.connect(socket_path) - self.prob_bbc_socket.send(pickle.dumps({"print": "Initialized MyModel (MV_python_integrator.py)\n"})) - - def setSimulatorForNewSimulation(self): - self.number_of_steps = 0 - self.current_output = self.sul.pre() - self.strategy_bridge.reset() - if len(self.exec_trace) > 0: - self.prob_bbc_socket.send(pickle.dumps({"trace" : self.exec_trace})) - self.exec_trace = [] - self.cex_candidate = [] - - def one_step(self): - self.number_of_steps += 1 - """Next step is determined based on transition probabilities of the current state. + def __init__(self): + print("MV_python_integrator setup\n") + mdp = load_automaton_from_file( + f"/Users/bo40/workspace/python/AALpy/DotModels/MDPs/{example}.dot", + automaton_type="mdp", + ) + self.sul = MdpSUL(mdp) + # == デバッグ用 == + self.prism_model_path = f"/Users/bo40/workspace/python/mc_exp.prism" + self.prism_adv_path = f"/Users/bo40/workspace/python/adv.tra" + # ==== + self.strategy_bridge = StrategyBridge( + self.prism_adv_path, self.prism_model_path + ) + self.number_of_steps = 0 + self.current_output = None + self.exec_trace = [] + self.cex_candidate = [] + self.prob_bbc_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.prob_bbc_socket.connect(socket_path) + self.prob_bbc_socket.send( + pickle.dumps({"print": "Initialized MyModel (MV_python_integrator.py)\n"}) + ) + + def setSimulatorForNewSimulation(self): + self.number_of_steps = 0 + self.current_output = self.sul.pre() + self.strategy_bridge.reset() + if len(self.exec_trace) > 0: + self.prob_bbc_socket.send(pickle.dumps({"trace": self.exec_trace})) + self.exec_trace = [] + self.cex_candidate = [] + + def one_step(self): + self.number_of_steps += 1 + """Next step is determined based on transition probabilities of the current state. Args: letter: input Returns: output of the current state """ - # strategyから次のアクションを決め、SULを実行する - action = self.strategy_bridge.next_action() - self.current_output = self.sul.step(action) - # 実行列を保存 - self.exec_trace.append(action) - self.exec_trace.append(self.current_output) - ret = self.strategy_bridge.update_state(action, self.current_output) - - # self.prob_bbc_socket.send(pickle.dumps({"print": f'one_step: {action}, {self.current_output}, {self.exec_trace}'})) - - if not ret: - if len(self.cex_candidate) == 0: - self.cex_candidate = copy.deepcopy(self.exec_trace) - # ProbBBCのメインプロセスに反例データを送る - self.prob_bbc_socket.send(pickle.dumps({"cex": self.cex_candidate})) - - return self.current_output - - def get_time(self): - return self.number_of_steps - - def eval(self, observation): - if self.current_output == None: - return 0 - else: - # MDPの出力は次のような文字列 'agree__six__c1_tails__c2_tails' - if observation in self.current_output.split('__'): - return 1 - else: - return 0 - + # strategyから次のアクションを決め、SULを実行する + action = self.strategy_bridge.next_action() + self.current_output = self.sul.step(action) + # 実行列を保存 + self.exec_trace.append(action) + self.exec_trace.append(self.current_output) + ret = self.strategy_bridge.update_state(action, self.current_output) + + # self.prob_bbc_socket.send(pickle.dumps({"print": f'one_step: {action}, {self.current_output}, {self.exec_trace}'})) + + if not ret: + if len(self.cex_candidate) == 0: + self.cex_candidate = copy.deepcopy(self.exec_trace) + # ProbBBCのメインプロセスに反例データを送る + self.prob_bbc_socket.send(pickle.dumps({"cex": self.cex_candidate})) + + return self.current_output + + def get_time(self): + return self.number_of_steps + + def eval(self, observation): + if self.current_output == None: + return 0 + else: + # MDPの出力は次のような文字列 'agree__six__c1_tails__c2_tails' + if observation in self.current_output.split("__"): + return 1 + else: + return 0 class SimulationWrapper(object): - - # constructor for Python - def __init__(self, model): - self.model = model - - # code to let multivesta initialize the simulator for a new simulation - # that is, re-initialize the model to its initial state, and set the - # new random seed - def setSimulatorForNewSimulation(self, random_seed): - self.model.setSimulatorForNewSimulation() - #Here you should replace 'setSimulatorForNewSimulation(random_seed)' - # with a method of your model to - #- set the new nuovo random seed - #- reset the status of the model to the initial one - # - - # code to let multivesta ask the simulator to perform a step of simulation - def performOneStepOfSimulation(self): - self.model.one_step() - #Here you should replace 'one_step()' with - # your method to perform a step of simulation - - # code to let multivesta ask the simulator to perform a - # "whole simulation" - # (i.e., until a stopping condition is found by the simulator) - def performWholeSimulation(self): - print('Not supported\n') - - # code to let multivesta ask the simulator the current simulated time - def getTime(self): - return float(self.model.get_time()) - #Here you should replace 'getTime()' with - # your method to get the current simulation time - # (or number of simulation step) - - - # code to let multivesta ask the simulator to return the value of the - # specified observation in the current state of the simulation - def rval(self, observation): - return float(self.model.eval(observation)) - #Here you should replace 'eval(observation)' with - # your method to evaluate an observation in the - # current simulation step - - class Java: - implements = ['vesta.python.IPythonSimulatorWrapper'] - - -if __name__ == '__main__': - porta = int(sys.argv[1]) - callback_porta = int(sys.argv[2]) - print('Python engine: expecting connection with java on port: '+str(porta)+' and callback connection on port '+str(callback_porta)) - gateway = JavaGateway(start_callback_server=True,gateway_parameters=GatewayParameters(port=porta),callback_server_parameters=CallbackServerParameters(port=callback_porta)) - - #Here you should put any initialization code you need to create an instance of - #your model_file_name class - - model=MyModel() - - gateway.entry_point.playWithState(SimulationWrapper(model)) \ No newline at end of file + # constructor for Python + def __init__(self, model): + self.model = model + + # code to let multivesta initialize the simulator for a new simulation + # that is, re-initialize the model to its initial state, and set the + # new random seed + def setSimulatorForNewSimulation(self, random_seed): + self.model.setSimulatorForNewSimulation() + # Here you should replace 'setSimulatorForNewSimulation(random_seed)' + # with a method of your model to + # - set the new nuovo random seed + # - reset the status of the model to the initial one + # + + # code to let multivesta ask the simulator to perform a step of simulation + def performOneStepOfSimulation(self): + self.model.one_step() + # Here you should replace 'one_step()' with + # your method to perform a step of simulation + + # code to let multivesta ask the simulator to perform a + # "whole simulation" + # (i.e., until a stopping condition is found by the simulator) + def performWholeSimulation(self): + print("Not supported\n") + + # code to let multivesta ask the simulator the current simulated time + def getTime(self): + return float(self.model.get_time()) + # Here you should replace 'getTime()' with + # your method to get the current simulation time + # (or number of simulation step) + + # code to let multivesta ask the simulator to return the value of the + # specified observation in the current state of the simulation + def rval(self, observation): + return float(self.model.eval(observation)) + # Here you should replace 'eval(observation)' with + # your method to evaluate an observation in the + # current simulation step + + class Java: + implements = ["vesta.python.IPythonSimulatorWrapper"] + + +if __name__ == "__main__": + porta = int(sys.argv[1]) + callback_porta = int(sys.argv[2]) + print( + "Python engine: expecting connection with java on port: " + + str(porta) + + " and callback connection on port " + + str(callback_porta) + ) + gateway = JavaGateway( + start_callback_server=True, + gateway_parameters=GatewayParameters(port=porta), + callback_server_parameters=CallbackServerParameters(port=callback_porta), + ) + + # Here you should put any initialization code you need to create an instance of + # your model_file_name class + + model = MyModel() + + gateway.entry_point.playWithState(SimulationWrapper(model)) diff --git a/misc/dfa_lean.py b/misc/dfa_lean.py index 6a3729f..e10965a 100644 --- a/misc/dfa_lean.py +++ b/misc/dfa_lean.py @@ -1,11 +1,20 @@ -from aalpy.utils import load_automaton_from_file, save_automaton_to_file, visualize_automaton, generate_random_dfa +from aalpy.utils import ( + load_automaton_from_file, + save_automaton_to_file, + visualize_automaton, + generate_random_dfa, +) from aalpy.SULs import DfaSUL from aalpy.oracles import RandomWalkEqOracle from aalpy.learning_algs import run_Lstar # randomly generate a dfa -random_dfa = generate_random_dfa(alphabet=[1,2,3,4,5],num_states=20, num_accepting_states=8) -big_random_dfa = generate_random_dfa(alphabet=[1,2,3,4,5],num_states=2000, num_accepting_states=500) +random_dfa = generate_random_dfa( + alphabet=[1, 2, 3, 4, 5], num_states=20, num_accepting_states=8 +) +big_random_dfa = generate_random_dfa( + alphabet=[1, 2, 3, 4, 5], num_states=2000, num_accepting_states=500 +) # get input alphabet of the automaton alphabet = random_dfa.get_input_alphabet() @@ -19,10 +28,10 @@ eq_oracle = RandomWalkEqOracle(alphabet, sul, num_steps=5000, reset_prob=0.09) # start learning -learned_dfa = run_Lstar(alphabet, sul, eq_oracle, automaton_type='dfa') +learned_dfa = run_Lstar(alphabet, sul, eq_oracle, automaton_type="dfa") # save automaton to file and visualize it -save_automaton_to_file(learned_dfa, path='Learned_Automaton', file_type='dot') +save_automaton_to_file(learned_dfa, path="Learned_Automaton", file_type="dot") # visualize automaton visualize_automaton(learned_dfa) diff --git a/misc/learn_then_model_check.py b/misc/learn_then_model_check.py index b9d6b28..7d4ecf2 100644 --- a/misc/learn_then_model_check.py +++ b/misc/learn_then_model_check.py @@ -3,28 +3,65 @@ from aalpy.SULs import MdpSUL from aalpy.oracles import RandomWalkEqOracle, RandomWordEqOracle from aalpy.learning_algs import run_stochastic_Lstar -from aalpy.utils import visualize_automaton, load_automaton_from_file, model_check_experiment, get_properties_file, get_correct_prop_values, mdp_2_prism_format +from aalpy.utils import ( + visualize_automaton, + load_automaton_from_file, + model_check_experiment, + get_properties_file, + get_correct_prop_values, + mdp_2_prism_format, +) from aalpy.automata.StochasticMealyMachine import smm_to_mdp_conversion from ..src.PrismModelConverter import add_step_counter_to_prism_model -def learn_benchmark_mdp(example, automaton_type='smm', n_c=20, n_resample=1000, min_rounds=20, max_rounds=500, - strategy='normal', cex_processing='longest_prefix', stopping_based_on_prop=None, - samples_cex_strategy=None): + +def learn_benchmark_mdp( + example, + automaton_type="smm", + n_c=20, + n_resample=1000, + min_rounds=20, + max_rounds=500, + strategy="normal", + cex_processing="longest_prefix", + stopping_based_on_prop=None, + samples_cex_strategy=None, +): # Specify the path to the dot file containing a MDP - mdp = load_automaton_from_file(f'../AALpy/DotModels/MDPs/{example}.dot', automaton_type='mdp') + mdp = load_automaton_from_file( + f"../AALpy/DotModels/MDPs/{example}.dot", automaton_type="mdp" + ) input_alphabet = mdp.get_input_alphabet() sul = MdpSUL(mdp) - eq_oracle = RandomWordEqOracle(input_alphabet, sul, num_walks=100, min_walk_len=5, max_walk_len=15, - reset_after_cex=True) - eq_oracle = RandomWalkEqOracle(input_alphabet, sul=sul, num_steps=2000, reset_prob=0.25, - reset_after_cex=True) + eq_oracle = RandomWordEqOracle( + input_alphabet, + sul, + num_walks=100, + min_walk_len=5, + max_walk_len=15, + reset_after_cex=True, + ) + eq_oracle = RandomWalkEqOracle( + input_alphabet, sul=sul, num_steps=2000, reset_prob=0.25, reset_after_cex=True + ) - learned_mdp = run_stochastic_Lstar(input_alphabet=input_alphabet, eq_oracle=eq_oracle, sul=sul, n_c=n_c, - n_resample=n_resample, min_rounds=min_rounds, max_rounds=max_rounds, - automaton_type=automaton_type, strategy=strategy, cex_processing=cex_processing, - samples_cex_strategy=samples_cex_strategy, target_unambiguity=0.99, - property_based_stopping=stopping_based_on_prop, print_level=3) + learned_mdp = run_stochastic_Lstar( + input_alphabet=input_alphabet, + eq_oracle=eq_oracle, + sul=sul, + n_c=n_c, + n_resample=n_resample, + min_rounds=min_rounds, + max_rounds=max_rounds, + automaton_type=automaton_type, + strategy=strategy, + cex_processing=cex_processing, + samples_cex_strategy=samples_cex_strategy, + target_unambiguity=0.99, + property_based_stopping=stopping_based_on_prop, + print_level=3, + ) return learned_mdp @@ -32,7 +69,7 @@ def learn_benchmark_mdp(example, automaton_type='smm', n_c=20, n_resample=1000, aalpy.paths.path_to_prism = "/Users/bo40/workspace/PRISM/prism/prism/bin/prism" aalpy.paths.path_to_properties = "../AALpy/Benchmarking/prism_eval_props/" -example = 'slot_machine' +example = "slot_machine" learned_model = learn_benchmark_mdp(example) if isinstance(learned_model, StochasticMealyMachine): @@ -42,10 +79,10 @@ def learn_benchmark_mdp(example, automaton_type='smm', n_c=20, n_resample=1000, prism_model_path = "learn-results/mc_exp.prism" converted_model_path = "learn-results/mc_exp.prism.convert" -mdp_2_prism_format(mdp, name='mc_exp', output_path=prism_model_path) +mdp_2_prism_format(mdp, name="mc_exp", output_path=prism_model_path) # PRISMのモデルにカウンタ変数を埋め込む add_step_counter_to_prism_model(prism_model_path, converted_model_path) # values, diff = model_check_experiment(get_properties_file(example), get_correct_prop_values(example), mdp) # print('Value for each property:', [round(d * 100, 2) for d in values.values()]) -# print('Error for each property:', [round(d * 100, 2) for d in diff.values()]) \ No newline at end of file +# print('Error for each property:', [round(d * 100, 2) for d in diff.values()]) diff --git a/misc/mdp_lean.py b/misc/mdp_lean.py index 667c2ae..ab3c190 100644 --- a/misc/mdp_lean.py +++ b/misc/mdp_lean.py @@ -3,17 +3,29 @@ from aalpy.learning_algs import run_stochastic_Lstar from aalpy.utils import visualize_automaton, generate_random_mdp -def random_mdp_lean(num_states, input_len, n_c=20, n_resample=1000, min_rounds=10, max_rounds=1000): - mdp, input_alphabet = generate_random_mdp(num_states, input_len) - visualize_automaton(mdp, path="sul") - sul = MdpSUL(mdp) - eq_oracle = RandomWalkEqOracle(input_alphabet, sul=sul, num_steps=5000, reset_prob=0.11, - reset_after_cex=True) - learned_mdp = run_stochastic_Lstar(input_alphabet, sul, eq_oracle, n_c=n_c, n_resample=n_resample, - min_rounds=min_rounds, max_rounds=max_rounds) +def random_mdp_lean( + num_states, input_len, n_c=20, n_resample=1000, min_rounds=10, max_rounds=1000 +): + mdp, input_alphabet = generate_random_mdp(num_states, input_len) + visualize_automaton(mdp, path="sul") + sul = MdpSUL(mdp) + eq_oracle = RandomWalkEqOracle( + input_alphabet, sul=sul, num_steps=5000, reset_prob=0.11, reset_after_cex=True + ) + + learned_mdp = run_stochastic_Lstar( + input_alphabet, + sul, + eq_oracle, + n_c=n_c, + n_resample=n_resample, + min_rounds=min_rounds, + max_rounds=max_rounds, + ) + + return learned_mdp - return learned_mdp learned = random_mdp_lean(10, 5) -visualize_automaton(learned) \ No newline at end of file +visualize_automaton(learned) diff --git a/misc/mdp_model_check.py b/misc/mdp_model_check.py index 92c9fcd..1280189 100644 --- a/misc/mdp_model_check.py +++ b/misc/mdp_model_check.py @@ -3,27 +3,63 @@ from aalpy.SULs import MdpSUL from aalpy.oracles import RandomWalkEqOracle, RandomWordEqOracle from aalpy.learning_algs import run_stochastic_Lstar -from aalpy.utils import visualize_automaton, load_automaton_from_file, model_check_experiment, get_properties_file, get_correct_prop_values +from aalpy.utils import ( + visualize_automaton, + load_automaton_from_file, + model_check_experiment, + get_properties_file, + get_correct_prop_values, +) from aalpy.automata.StochasticMealyMachine import smm_to_mdp_conversion -def learn_benchmark_mdp(example, automaton_type='smm', n_c=20, n_resample=1000, min_rounds=20, max_rounds=500, - strategy='normal', cex_processing='longest_prefix', stopping_based_on_prop=None, - samples_cex_strategy=None): + +def learn_benchmark_mdp( + example, + automaton_type="smm", + n_c=20, + n_resample=1000, + min_rounds=20, + max_rounds=500, + strategy="normal", + cex_processing="longest_prefix", + stopping_based_on_prop=None, + samples_cex_strategy=None, +): # Specify the path to the dot file containing a MDP - mdp = load_automaton_from_file(f'../../AALpy/DotModels/MDPs/{example}.dot', automaton_type='mdp') + mdp = load_automaton_from_file( + f"../../AALpy/DotModels/MDPs/{example}.dot", automaton_type="mdp" + ) input_alphabet = mdp.get_input_alphabet() sul = MdpSUL(mdp) - eq_oracle = RandomWordEqOracle(input_alphabet, sul, num_walks=100, min_walk_len=5, max_walk_len=15, - reset_after_cex=True) - eq_oracle = RandomWalkEqOracle(input_alphabet, sul=sul, num_steps=2000, reset_prob=0.25, - reset_after_cex=True) + eq_oracle = RandomWordEqOracle( + input_alphabet, + sul, + num_walks=100, + min_walk_len=5, + max_walk_len=15, + reset_after_cex=True, + ) + eq_oracle = RandomWalkEqOracle( + input_alphabet, sul=sul, num_steps=2000, reset_prob=0.25, reset_after_cex=True + ) - learned_mdp = run_stochastic_Lstar(input_alphabet=input_alphabet, eq_oracle=eq_oracle, sul=sul, n_c=n_c, - n_resample=n_resample, min_rounds=min_rounds, max_rounds=max_rounds, - automaton_type=automaton_type, strategy=strategy, cex_processing=cex_processing, - samples_cex_strategy=samples_cex_strategy, target_unambiguity=0.99, - property_based_stopping=stopping_based_on_prop, print_level=3) + learned_mdp = run_stochastic_Lstar( + input_alphabet=input_alphabet, + eq_oracle=eq_oracle, + sul=sul, + n_c=n_c, + n_resample=n_resample, + min_rounds=min_rounds, + max_rounds=max_rounds, + automaton_type=automaton_type, + strategy=strategy, + cex_processing=cex_processing, + samples_cex_strategy=samples_cex_strategy, + target_unambiguity=0.99, + property_based_stopping=stopping_based_on_prop, + print_level=3, + ) return learned_mdp @@ -31,7 +67,7 @@ def learn_benchmark_mdp(example, automaton_type='smm', n_c=20, n_resample=1000, aalpy.paths.path_to_prism = "/Users/bo40/workspace/PRISM/prism/prism/bin/prism" aalpy.paths.path_to_properties = "../../AALpy/Benchmarking/prism_eval_props/" -example = 'slot_machine' +example = "slot_machine" learned_model = learn_benchmark_mdp(example) if isinstance(learned_model, StochasticMealyMachine): @@ -39,7 +75,9 @@ def learn_benchmark_mdp(example, automaton_type='smm', n_c=20, n_resample=1000, else: mdp = learned_model -values, diff = model_check_experiment(get_properties_file(example), get_correct_prop_values(example), mdp) +values, diff = model_check_experiment( + get_properties_file(example), get_correct_prop_values(example), mdp +) -print('Value for each property:', [round(d * 100, 2) for d in values.values()]) -print('Error for each property:', [round(d * 100, 2) for d in diff.values()]) \ No newline at end of file +print("Value for each property:", [round(d * 100, 2) for d in values.values()]) +print("Error for each property:", [round(d * 100, 2) for d in diff.values()]) diff --git a/misc/smc_uniform.py b/misc/smc_uniform.py index 7b9ecdc..b2e0449 100644 --- a/misc/smc_uniform.py +++ b/misc/smc_uniform.py @@ -11,14 +11,17 @@ import spot import buddy + class StatisticalModelChecker: - def __init__(self, mdp_sut : SUL, actions, spec_path, num_exec=1000, max_exec_len=40): + def __init__( + self, mdp_sut: SUL, actions, spec_path, num_exec=1000, max_exec_len=40 + ): self.sut = mdp_sut self.actions = actions self.exec_trace = [] with open(spec_path) as f: spec = f.readline() - self.spec_monitor = spot.translate(spec, 'monitor', 'det') + self.spec_monitor = spot.translate(spec, "monitor", "det") self.bdict = self.spec_monitor.get_dict() self.spec_monitor_out = [] for s in range(0, self.spec_monitor.num_states()): @@ -26,7 +29,10 @@ def __init__(self, mdp_sut : SUL, actions, spec_path, num_exec=1000, max_exec_le self.ap_varnum = dict() for ap in self.spec_monitor.ap(): var = self.bdict.varnum(ap) - self.ap_varnum[ap.to_str()] = (buddy.bdd_ithvar(var), buddy.bdd_nithvar(var)) + self.ap_varnum[ap.to_str()] = ( + buddy.bdd_ithvar(var), + buddy.bdd_nithvar(var), + ) self.num_exec = num_exec self.max_exec_len = max_exec_len self.exec_sample = [] @@ -56,17 +62,24 @@ def run(self): if k > 0 and k % 500 == 0: # output satisfaction and violation counts - print(f'values:{k},{self.exec_count_satisfication / (self.exec_count_satisfication + self.exec_count_violation)},{self.exec_count_satisfication},{self.exec_count_violation}') + print( + f"values:{k},{self.exec_count_satisfication / (self.exec_count_satisfication + self.exec_count_violation)},{self.exec_count_satisfication},{self.exec_count_violation}" + ) return None def hypothesis_testing(self, mean, alternative): - sample = np.concatenate((np.zeros(self.exec_count_violation), np.ones(self.exec_count_satisfication))) + sample = np.concatenate( + ( + np.zeros(self.exec_count_violation), + np.ones(self.exec_count_satisfication), + ) + ) return stats.ttest_1samp(sample, mean, alternative=alternative) def reset_sut(self): self.number_of_steps = 0 self.current_output = self.sut.pre() - self.current_output_aps = self.current_output.split('__') + self.current_output_aps = self.current_output.split("__") self.exec_trace = [] self.monitor_current_state = self.spec_monitor.get_init_state_number() @@ -75,7 +88,7 @@ def one_step(self): # strategyから次のアクションを決め、SULを実行する action = random.choice(self.actions) self.current_output = self.sut.step(action) - self.current_output_aps = self.current_output.split('__') + self.current_output_aps = self.current_output.split("__") # 実行列を保存 self.exec_trace.append(action) self.exec_trace.append(self.current_output) @@ -86,7 +99,7 @@ def post_sut(self): # 出力outputにより、モニターの状態遷移を行う。 # 返り値はモニターの状態遷移が行われたか否かと条件が常に成立する状態に到達したか否か。モニターの状態遷移が行えないことは、仕様の違反を意味する。 - def step_monitor(self, output_aps : List[str]) -> Tuple[bool, bool]: + def step_monitor(self, output_aps: List[str]) -> Tuple[bool, bool]: # モニターの遷移ラベルのガードと、システムの出力を比較する edges = self.spec_monitor_out[self.monitor_current_state] accept = False @@ -105,7 +118,7 @@ def step_monitor(self, output_aps : List[str]) -> Tuple[bool, bool]: # 出力outputと、モニターのedgeを受け取り、edgeの条件をoutputが満たしているか判定する # 返り値はペアで、一つ目は条件を満たしていてedgeで遷移できるならば遷移先の状態を返し、遷移できないならばNoneを返す # 二つ目はセルフループしか存在しない状態に到達したか否か(以降は条件が常に成立するか否か) - def guardCheck(self, output_aps : List[str], edge): + def guardCheck(self, output_aps: List[str], edge): cond = edge.cond # label = spot.bdd_format_formula(self.bdict, cond) # # print(f'output: {output}') @@ -115,9 +128,9 @@ def guardCheck(self, output_aps : List[str], edge): # 条件が常に成立 return (edge.dst, True) aps_bdd = buddy.bdd_support(cond) - aps = spot.bdd_format_formula(self.bdict, aps_bdd).split(' & ') + aps = spot.bdd_format_formula(self.bdict, aps_bdd).split(" & ") for ap in aps: - if (ap in output_aps): + if ap in output_aps: bdd_var = self.ap_varnum[ap][0] cond = buddy.bdd_restrict(cond, bdd_var) else: @@ -126,18 +139,26 @@ def guardCheck(self, output_aps : List[str], edge): # restricted_label = spot.bdd_format_formula(self.bdict, cond) # print(f'cond : {restricted_label}') ret = buddy.bdd_satcount(cond) - if (ret > 0): + if ret > 0: return (edge.dst, False) else: return (None, False) + def initialize_argparse(): parser = argparse.ArgumentParser() - parser.add_argument("--model-path", dest="model_path", help="path to input model", required=True) - parser.add_argument("--prop-path", dest="prop_path", help="path to property file", required=True) + parser.add_argument( + "--model-path", dest="model_path", help="path to input model", required=True + ) + parser.add_argument( + "--prop-path", dest="prop_path", help="path to property file", required=True + ) return parser + finally_regex = re.compile(r"F[ ]*\[[0-9 ]+,([0-9 ]+)\].*") + + def prop_max_step(prop_path): ret = 0 with open(prop_path) as f: @@ -151,19 +172,20 @@ def prop_max_step(prop_path): return 30 return ret + 2 + def main(): parser = initialize_argparse() args = parser.parse_args() - mdp = load_automaton_from_file(args.model_path, automaton_type='mdp') + mdp = load_automaton_from_file(args.model_path, automaton_type="mdp") # visualize_automaton(mdp) sul = MdpSUL(mdp) - with open(args.prop_path, 'r') as f: - prop_str = f.read().strip() - print(f'Property: {prop_str}') + with open(args.prop_path, "r") as f: + prop_str = f.read().strip() + print(f"Property: {prop_str}") max_exec_len = prop_max_step(args.prop_path) - print(f'Property max exec length : {max_exec_len}') + print(f"Property max exec length : {max_exec_len}") # coins actions = ["go1", "go2"] @@ -176,14 +198,20 @@ def main(): # tcp # actions = ["CLOSECONNECTION", "ACK_plus_PSH_paren_V_c_V_c_1_paren_", "SYN_plus_ACK_paren_V_c_V_c_0_paren_", "RST_paren_V_c_V_c_0_paren_", "ACCEPT", "FIN_plus_ACK_paren_V_c_V_c_0_paren_", "LISTEN", "SYN_paren_V_c_V_c_0_paren_", "RCV", "ACK_plus_RST_paren_V_c_V_c_0_paren_", "CLOSE", "ACK_paren_V_c_V_c_0_paren_"] - smc : StatisticalModelChecker = StatisticalModelChecker(sul, actions, args.prop_path, num_exec=26492, max_exec_len=max_exec_len) + smc: StatisticalModelChecker = StatisticalModelChecker( + sul, actions, args.prop_path, num_exec=26492, max_exec_len=max_exec_len + ) smc.run() - print(f'SUT value by SMC: {smc.exec_count_satisfication / smc.num_exec} (satisfication: {smc.exec_count_satisfication}, total: {smc.num_exec})') - smc_log_path = args.prop_path + '.smclog' - with open(smc_log_path, 'a+') as f: - f.write(f'{prop_str}\n{smc.exec_count_satisfication / smc.num_exec}\n{smc.exec_count_satisfication}\n{smc.num_exec}') + print( + f"SUT value by SMC: {smc.exec_count_satisfication / smc.num_exec} (satisfication: {smc.exec_count_satisfication}, total: {smc.num_exec})" + ) + smc_log_path = args.prop_path + ".smclog" + with open(smc_log_path, "a+") as f: + f.write( + f"{prop_str}\n{smc.exec_count_satisfication / smc.num_exec}\n{smc.exec_count_satisfication}\n{smc.num_exec}" + ) print("Finish evaluation of each round") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/misc/weired_coffee.py b/misc/weired_coffee.py index e2f48af..00b8ffb 100644 --- a/misc/weired_coffee.py +++ b/misc/weired_coffee.py @@ -2,6 +2,7 @@ Learning faulty coffee machine that can be found in Chapter 5 and Chapter 7 of Martin's Tappler PhD thesis. :return learned MDP """ + from aalpy.SULs import MdpSUL from aalpy.oracles import RandomWalkEqOracle from aalpy.learning_algs import run_stochastic_Lstar @@ -13,11 +14,22 @@ input_alphabet = mdp.get_input_alphabet() sul = MdpSUL(mdp) -eq_oracle = RandomWalkEqOracle(input_alphabet, sul=sul, num_steps=4000, reset_prob=0.11, - reset_after_cex=True) +eq_oracle = RandomWalkEqOracle( + input_alphabet, sul=sul, num_steps=4000, reset_prob=0.11, reset_after_cex=True +) -learned_mdp = run_stochastic_Lstar(input_alphabet, sul, eq_oracle, n_c=20, n_resample=1000, min_rounds=10, - max_rounds=500, strategy='normal', cex_processing='rs', - samples_cex_strategy='bfs', automaton_type='smm') +learned_mdp = run_stochastic_Lstar( + input_alphabet, + sul, + eq_oracle, + n_c=20, + n_resample=1000, + min_rounds=10, + max_rounds=500, + strategy="normal", + cex_processing="rs", + samples_cex_strategy="bfs", + automaton_type="smm", +) -visualize_automaton(learned_mdp) \ No newline at end of file +visualize_automaton(learned_mdp) diff --git a/src/PrismModelConverter.py b/src/PrismModelConverter.py index 3d8478f..b9fcba7 100644 --- a/src/PrismModelConverter.py +++ b/src/PrismModelConverter.py @@ -5,18 +5,25 @@ module_regex = re.compile(r"module (\w+)") label_regex = re.compile(r"(\d+\.*\d* : \([\w\d'=]+\))") + def add_step_counter_to_prism_model(prism_model_path, output_file_path): with open(prism_model_path) as f: - with open(output_file_path, mode='w') as o: + with open(output_file_path, mode="w") as o: for line in f: module_match = module_regex.match(line) if module_match: - modified_line = module_regex.sub(r"module \1\nsteps : [0.." + str(step_bound) + r"] init 0;", line) + modified_line = module_regex.sub( + r"module \1\nsteps : [0.." + str(step_bound) + r"] init 0;", + line, + ) o.write(modified_line) else: - modified_line = label_regex.sub(r"\1&(steps'=min(" + str(step_bound) + r",steps + 1))", line) + modified_line = label_regex.sub( + r"\1&(steps'=min(" + str(step_bound) + r",steps + 1))", line + ) o.write(modified_line) + # prism_model_path = f'/Users/bo40/workspace/python/mc_exp-slot_machine.prism' # add_step_counter_to_prism_model(prism_model_path, "converted_model.prism") diff --git a/src/ProbBlackBoxChecking.py b/src/ProbBlackBoxChecking.py index 5330413..fe8b293 100644 --- a/src/ProbBlackBoxChecking.py +++ b/src/ProbBlackBoxChecking.py @@ -11,7 +11,12 @@ from aalpy.SULs import MdpSUL from aalpy.oracles import RandomWalkEqOracle, RandomWordEqOracle from aalpy.learning_algs import run_stochastic_Lstar -from aalpy.utils import load_automaton_from_file, mdp_2_prism_format, get_properties_file, get_correct_prop_values +from aalpy.utils import ( + load_automaton_from_file, + mdp_2_prism_format, + get_properties_file, + get_correct_prop_values, +) from aalpy.automata.StochasticMealyMachine import smm_to_mdp_conversion from aalpy.utils.HelperFunctions import print_observation_table @@ -24,31 +29,52 @@ prism_exception_regex = re.compile("^Exception in thread") -def evaluate_properties(prism_file_name, properties_file_name, prism_adv_path, exportstates_path, exporttrans_path, - exportlabels_path, debug=False): - logger = logging.getLogger('evaluate_properties') +def evaluate_properties( + prism_file_name, + properties_file_name, + prism_adv_path, + exportstates_path, + exporttrans_path, + exportlabels_path, + debug=False, +): + logger = logging.getLogger("evaluate_properties") if debug: - print('=============== PRISM output ===============', flush=True) + print("=============== PRISM output ===============", flush=True) import subprocess import io from os import path - prism_file = aalpy.paths.path_to_prism.split('/')[-1] - path_to_prism_file = aalpy.paths.path_to_prism[:-len(prism_file)] + prism_file = aalpy.paths.path_to_prism.split("/")[-1] + path_to_prism_file = aalpy.paths.path_to_prism[: -len(prism_file)] - exportadvmdp = '-exportadvmdp' + exportadvmdp = "-exportadvmdp" adversary_path = path.abspath(prism_adv_path) - exportstates = '-exportstates' - exporttrans = '-exporttrans' - exportlabels = '-exportlabels' + exportstates = "-exportstates" + exporttrans = "-exporttrans" + exportlabels = "-exportlabels" file_abs_path = path.abspath(prism_file_name) properties_als_path = path.abspath(properties_file_name) results = {} # PRISMの呼び出し adversaryを出力するようにパラメタ指定 proc = subprocess.Popen( - [aalpy.paths.path_to_prism, exportadvmdp, adversary_path, exportstates, exportstates_path, exporttrans, - exporttrans_path, exportlabels, exportlabels_path, file_abs_path, properties_als_path], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=path_to_prism_file) + [ + aalpy.paths.path_to_prism, + exportadvmdp, + adversary_path, + exportstates, + exportstates_path, + exporttrans, + exporttrans_path, + exportlabels, + exportlabels_path, + file_abs_path, + properties_als_path, + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + cwd=path_to_prism_file, + ) found_exception = False for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"): if debug: @@ -62,7 +88,7 @@ def evaluate_properties(prism_file_name, properties_file_name, prism_adv_path, e logger.error(line) match = prism_prob_output_regex.match(line) if match: - results[f'prop{len(results) + 1}'] = float(match.group(1)) + results[f"prop{len(results) + 1}"] = float(match.group(1)) proc.kill() if debug: print("=============== end of PRISM output ===============") @@ -77,7 +103,7 @@ def sort_by_frequency_counter(sample) -> collections.Counter: prefix_closed_sample = [] for trace in sample: for i in range(2, len(trace) + 1, 2): - assert (len(trace[0:i]) % 2 == 0) + assert len(trace[0:i]) % 2 == 0 prefix_closed_sample.append(tuple(trace[0:i])) return collections.Counter(prefix_closed_sample) @@ -86,7 +112,7 @@ def sort_by_frequency_counter_in(sample) -> collections.Counter: prefix_closed_sample = [] for trace in sample: for i in range(1, len(trace) + 1, 2): - assert (len(trace[0:i]) % 2 == 1) + assert len(trace[0:i]) % 2 == 1 prefix_closed_sample.append(tuple(trace[0:i])) return collections.Counter(prefix_closed_sample) @@ -108,7 +134,7 @@ def probability_on_mdp(trace, mdp): return ret cex_candidates = sort_by_frequency(satisfied_sample) - for (exec_trace, freq) in cex_candidates: + for exec_trace, freq in cex_candidates: exec_trace = list(exec_trace) # MDPでのtraceの出現確率を計算 mdp_prob = probability_on_mdp(exec_trace, mdp) @@ -157,7 +183,7 @@ def mdp_get_probability(mdp_state, action, observation) -> float: # Make total_sample prefix closed and count the frequency cex_candidates = sort_by_frequency(total_sample) prefix_counts = sort_by_frequency_counter_in(total_sample) - for (exec_trace, freq) in cex_candidates: + for exec_trace, freq in cex_candidates: exec_trace = list(exec_trace) prefix = exec_trace[0:-2] last_action = exec_trace[-2] @@ -168,7 +194,7 @@ def mdp_get_probability(mdp_state, action, observation) -> float: mdp_prob = mdp_get_probability(mdp_state, last_action, last_observation) prefix_with_action_frequency = prefix_counts[tuple(prefix_with_action)] - assert (freq <= prefix_with_action_frequency) + assert freq <= prefix_with_action_frequency sut_prob = freq / prefix_with_action_frequency # 違う分布であれば反例として返す @@ -180,20 +206,48 @@ def mdp_get_probability(mdp_state, action, observation) -> float: return None -def initialize_strategy_bridge_and_smc(sul, prism_model_path, prism_adv_path, spec_path, hypothesis_value, - observation_table, returnCEX): +def initialize_strategy_bridge_and_smc( + sul, + prism_model_path, + prism_adv_path, + spec_path, + hypothesis_value, + observation_table, + returnCEX, +): sb = StrategyBridge(prism_adv_path, prism_model_path) - return StatisticalModelChecker(sul, sb, spec_path, hypothesis_value, observation_table, num_exec=5000, - returnCEX=returnCEX) + return StatisticalModelChecker( + sul, + sb, + spec_path, + hypothesis_value, + observation_table, + num_exec=5000, + returnCEX=returnCEX, + ) # PRISM + SMC による反例探索オラクル class ProbBBReachOracle(RandomWalkEqOracle): - def __init__(self, prism_model_path, prism_adv_path, prism_prop_path, ltl_prop_path, alphabet: list, sul: SUL, - smc_max_exec=5000, num_steps=5000, reset_after_cex=True, initial_reset_prob=0.25, - statistical_test_bound=0.025, only_classical_equivalence_testing=False, - output_dir='results', save_files_for_each_round=False, debug=False): + def __init__( + self, + prism_model_path, + prism_adv_path, + prism_prop_path, + ltl_prop_path, + alphabet: list, + sul: SUL, + smc_max_exec=5000, + num_steps=5000, + reset_after_cex=True, + initial_reset_prob=0.25, + statistical_test_bound=0.025, + only_classical_equivalence_testing=False, + output_dir="results", + save_files_for_each_round=False, + debug=False, + ): self.prism_model_path = prism_model_path self.prism_adv_path = prism_adv_path self.prism_prop_path = prism_prop_path @@ -211,8 +265,13 @@ def __init__(self, prism_model_path, prism_adv_path, prism_prop_path, ltl_prop_p # We discount the reset probability so that any length of traces are sampled in the limit. self.reset_prob_discount = 0.90 self.learned_strategy = None - super().__init__(alphabet, sul=sul, num_steps=num_steps, reset_after_cex=reset_after_cex, - reset_prob=initial_reset_prob) + super().__init__( + alphabet, + sul=sul, + num_steps=num_steps, + reset_after_cex=reset_after_cex, + reset_prob=initial_reset_prob, + ) def discount_reset_prob(self): logging.info(f"discount reset_prob to {self.reset_prob}") @@ -233,41 +292,49 @@ def find_cex(self, hypothesis): os.remove(self.prism_model_path) if os.path.isfile(self.prism_adv_path): os.remove(self.prism_adv_path) - self.converted_model_path = f'{self.prism_model_path}.convert' + self.converted_model_path = f"{self.prism_model_path}.convert" if os.path.isfile(self.converted_model_path): os.remove(self.converted_model_path) - self.exportstates_path = f'{self.prism_model_path}.sta' + self.exportstates_path = f"{self.prism_model_path}.sta" if os.path.isfile(self.exportstates_path): os.remove(self.exportstates_path) - self.exporttrans_path = f'{self.prism_model_path}.tra' + self.exporttrans_path = f"{self.prism_model_path}.tra" if os.path.isfile(self.exporttrans_path): os.remove(self.exporttrans_path) - self.exportlabels_path = f'{self.prism_model_path}.lab' + self.exportlabels_path = f"{self.prism_model_path}.lab" if os.path.isfile(self.exportlabels_path): os.remove(self.exportlabels_path) - mdp_2_prism_format(mdp, name='mc_exp', output_path=self.prism_model_path) + mdp_2_prism_format(mdp, name="mc_exp", output_path=self.prism_model_path) # PRISMのモデルにカウンタ変数を埋め込む - add_step_counter_to_prism_model(self.prism_model_path, self.converted_model_path) + add_step_counter_to_prism_model( + self.prism_model_path, self.converted_model_path + ) # PRISMでモデル検査を実行 logging.info("Model check by PRISM.") - prism_ret = evaluate_properties(self.converted_model_path, self.prism_prop_path, self.prism_adv_path, - self.exportstates_path, self.exporttrans_path, self.exportlabels_path, - debug=self.debug) + prism_ret = evaluate_properties( + self.converted_model_path, + self.prism_prop_path, + self.prism_adv_path, + self.exportstates_path, + self.exporttrans_path, + self.exportlabels_path, + debug=self.debug, + ) # 各ラウンドのファイルを保存 if self.save_files_for_each_round: self.save_prism_files() info = { - 'learning_rounds': self.rounds, - 'automaton_size': len(hypothesis.states), - 'sul.num_queries': self.sul.num_queries, - 'sul.num_steps': self.sul.num_steps, - 'eq_oracle.num_queries': self.num_queries, - 'eq_oracle.num_steps': self.num_steps, + "learning_rounds": self.rounds, + "automaton_size": len(hypothesis.states), + "sul.num_queries": self.sul.num_queries, + "sul.num_steps": self.sul.num_steps, + "eq_oracle.num_queries": self.num_queries, + "eq_oracle.num_steps": self.num_steps, } - logging.info(f'Round information : {info}') + logging.info(f"Round information : {info}") if len(prism_ret) == 0: # 仕様を計算できていない (APが存在しない場合など) @@ -291,20 +358,32 @@ def find_cex(self, hypothesis): return cex self.learned_strategy = self.prism_adv_path - hypothesis_value = prism_ret['prop1'] + hypothesis_value = prism_ret["prop1"] logging.info(f"Hypothesis probability : {hypothesis_value}") # SMCを実行する - sb = StrategyBridge(self.prism_adv_path, self.exportstates_path, self.exporttrans_path, self.exportlabels_path) - smc: StatisticalModelChecker = StatisticalModelChecker(self.sul, sb, self.ltl_prop_path, hypothesis_value, - self.observation_table, num_exec=self.smc_max_exec, - returnCEX=True) + sb = StrategyBridge( + self.prism_adv_path, + self.exportstates_path, + self.exporttrans_path, + self.exportlabels_path, + ) + smc: StatisticalModelChecker = StatisticalModelChecker( + self.sul, + sb, + self.ltl_prop_path, + hypothesis_value, + self.observation_table, + num_exec=self.smc_max_exec, + returnCEX=True, + ) cex = smc.run() logging.info( - f'SMC executed SUL {smc.number_of_steps} steps ({smc.exec_count_satisfication + smc.exec_count_violation} queries)') + f"SMC executed SUL {smc.number_of_steps} steps ({smc.exec_count_satisfication + smc.exec_count_violation} queries)" + ) if not self.only_classical_equivalence_testing: - logging.info(f'CEX from SMC: {cex}') + logging.info(f"CEX from SMC: {cex}") if cex != -1 and cex != None: # 具体的な反例が得られればそれを返す @@ -313,15 +392,16 @@ def find_cex(self, hypothesis): if cex == -1: # Observation tableがclosedかつconsistentでなくなったとき logging.info( - "Exit find_cex of ProbBBReachOracle because observation table is not closed and consistent.") + "Exit find_cex of ProbBBReachOracle because observation table is not closed and consistent." + ) return None # SMCの結果 と hypothesis_value に有意差があるか検定を行う - logging.info(f'SUT value : {smc.exec_count_satisfication / smc.num_exec}') - logging.info(f'Hypothesis value : {hypothesis_value}') + logging.info(f"SUT value : {smc.exec_count_satisfication / smc.num_exec}") + logging.info(f"Hypothesis value : {hypothesis_value}") if not self.only_classical_equivalence_testing: - hyp_test_ret = smc.hypothesis_testing(hypothesis_value, 'two-sided') - logging.info(f'Hypothesis testing result : {hyp_test_ret}') + hyp_test_ret = smc.hypothesis_testing(hypothesis_value, "two-sided") + logging.info(f"Hypothesis testing result : {hyp_test_ret}") # if hyp_test_ret violates the error bound if hyp_test_ret.pvalue < self.statistical_test_bound: @@ -329,7 +409,9 @@ def find_cex(self, hypothesis): # TODO: 複数回のSMCのサンプルの和集合をtotal_sampleとして渡すこともできるようにする logging.info("Compare frequency between SMC sample and hypothesis.") # cex = compare_frequency(smc.satisfied_exec_sample, smc.exec_sample, mdp, self.statistical_test_bound) - cex = compare_frequency_with_tail(smc.exec_sample, mdp, self.statistical_test_bound) + cex = compare_frequency_with_tail( + smc.exec_sample, mdp, self.statistical_test_bound + ) if cex != None: logging.info(f"CEX from compare_frequency : {cex}") return cex @@ -339,7 +421,7 @@ def find_cex(self, hypothesis): # SMCで反例が見つからなかったので equivalence testing logging.info("Run equivalence testing of L*mdp.") cex = super().find_cex(hypothesis) # equivalence testing - logging.info(f'CEX from EQ testing : {cex}') + logging.info(f"CEX from EQ testing : {cex}") if cex is None: self.discount_reset_prob() @@ -350,83 +432,184 @@ def save_prism_files(self): logging.info(f"Save intermediate generated files to {rounds_dir}") os.makedirs(rounds_dir, exist_ok=True) if os.path.isfile(self.prism_model_path): - shutil.copy(self.prism_model_path, f"{rounds_dir}/{os.path.basename(self.prism_model_path)}") + shutil.copy( + self.prism_model_path, + f"{rounds_dir}/{os.path.basename(self.prism_model_path)}", + ) if os.path.isfile(self.prism_adv_path): - shutil.copy(self.prism_adv_path, f"{rounds_dir}/{os.path.basename(self.prism_adv_path)}") + shutil.copy( + self.prism_adv_path, + f"{rounds_dir}/{os.path.basename(self.prism_adv_path)}", + ) if os.path.isfile(self.converted_model_path): - shutil.copy(self.converted_model_path, f"{rounds_dir}/{os.path.basename(self.converted_model_path)}") + shutil.copy( + self.converted_model_path, + f"{rounds_dir}/{os.path.basename(self.converted_model_path)}", + ) if os.path.isfile(self.exportstates_path): - shutil.copy(self.exportstates_path, f"{rounds_dir}/{os.path.basename(self.exportstates_path)}") + shutil.copy( + self.exportstates_path, + f"{rounds_dir}/{os.path.basename(self.exportstates_path)}", + ) if os.path.isfile(self.exporttrans_path): - shutil.copy(self.exporttrans_path, f"{rounds_dir}/{os.path.basename(self.exporttrans_path)}") + shutil.copy( + self.exporttrans_path, + f"{rounds_dir}/{os.path.basename(self.exporttrans_path)}", + ) if os.path.isfile(self.exportlabels_path): - shutil.copy(self.exportlabels_path, f"{rounds_dir}/{os.path.basename(self.exportlabels_path)}") + shutil.copy( + self.exportlabels_path, + f"{rounds_dir}/{os.path.basename(self.exportlabels_path)}", + ) # if self.debug: # ot = self.observation_table -def learn_mdp_and_strategy(mdp_model_path, prism_model_path, prism_adv_path, prism_prop_path, ltl_prop_path, - automaton_type='smm', n_c=20, n_resample=1000, min_rounds=20, max_rounds=240, - strategy='normal', cex_processing='longest_prefix', stopping_based_on_prop=None, - target_unambiguity=0.99, eq_num_steps=2000, - smc_max_exec=5000, smc_statistical_test_bound=0.025, eq_test_initial_reset_prob=0.25, - only_classical_equivalence_testing=False, - samples_cex_strategy=None, output_dir='results', save_files_for_each_round=False, - debug=False): - mdp = load_automaton_from_file(mdp_model_path, automaton_type='mdp') +def learn_mdp_and_strategy( + mdp_model_path, + prism_model_path, + prism_adv_path, + prism_prop_path, + ltl_prop_path, + automaton_type="smm", + n_c=20, + n_resample=1000, + min_rounds=20, + max_rounds=240, + strategy="normal", + cex_processing="longest_prefix", + stopping_based_on_prop=None, + target_unambiguity=0.99, + eq_num_steps=2000, + smc_max_exec=5000, + smc_statistical_test_bound=0.025, + eq_test_initial_reset_prob=0.25, + only_classical_equivalence_testing=False, + samples_cex_strategy=None, + output_dir="results", + save_files_for_each_round=False, + debug=False, +): + mdp = load_automaton_from_file(mdp_model_path, automaton_type="mdp") # visualize_automaton(mdp) input_alphabet = mdp.get_input_alphabet() sul = MdpSUL(mdp) - return learn_mdp_and_strategy_from_sul(sul, input_alphabet, prism_model_path, prism_adv_path, prism_prop_path, - ltl_prop_path, automaton_type, n_c, n_resample, min_rounds, max_rounds, - strategy, cex_processing, stopping_based_on_prop, target_unambiguity, - eq_num_steps, smc_max_exec, smc_statistical_test_bound, eq_test_initial_reset_prob, - only_classical_equivalence_testing, samples_cex_strategy, output_dir, - save_files_for_each_round, debug) - - -def learn_mdp_and_strategy_from_sul(sul, input_alphabet, prism_model_path, prism_adv_path, prism_prop_path, - ltl_prop_path, automaton_type='smm', n_c=20, n_resample=1000, min_rounds=20, - max_rounds=240, strategy='normal', cex_processing='longest_prefix', - stopping_based_on_prop=None, target_unambiguity=0.99, eq_num_steps=2000, - smc_max_exec=5000, smc_statistical_test_bound=0.025, eq_test_initial_reset_prob=0.25, - only_classical_equivalence_testing=False, samples_cex_strategy=None, - output_dir='results', save_files_for_each_round=False, debug=False): - logging.info(f'min_rounds: {min_rounds}') - logging.info(f'max_rounds: {max_rounds}') - logging.info(f'smc_statistical_test_bound: {smc_statistical_test_bound}') - logging.info(f'eq_test_initial_reset_prob: {eq_test_initial_reset_prob}') - - eq_oracle = ProbBBReachOracle(prism_model_path, prism_adv_path, prism_prop_path, ltl_prop_path, input_alphabet, - sul=sul, smc_max_exec=smc_max_exec, statistical_test_bound=smc_statistical_test_bound, - only_classical_equivalence_testing=only_classical_equivalence_testing, - num_steps=eq_num_steps, initial_reset_prob=eq_test_initial_reset_prob, - reset_after_cex=True, - output_dir=output_dir, save_files_for_each_round=save_files_for_each_round, - debug=debug) + return learn_mdp_and_strategy_from_sul( + sul, + input_alphabet, + prism_model_path, + prism_adv_path, + prism_prop_path, + ltl_prop_path, + automaton_type, + n_c, + n_resample, + min_rounds, + max_rounds, + strategy, + cex_processing, + stopping_based_on_prop, + target_unambiguity, + eq_num_steps, + smc_max_exec, + smc_statistical_test_bound, + eq_test_initial_reset_prob, + only_classical_equivalence_testing, + samples_cex_strategy, + output_dir, + save_files_for_each_round, + debug, + ) + + +def learn_mdp_and_strategy_from_sul( + sul, + input_alphabet, + prism_model_path, + prism_adv_path, + prism_prop_path, + ltl_prop_path, + automaton_type="smm", + n_c=20, + n_resample=1000, + min_rounds=20, + max_rounds=240, + strategy="normal", + cex_processing="longest_prefix", + stopping_based_on_prop=None, + target_unambiguity=0.99, + eq_num_steps=2000, + smc_max_exec=5000, + smc_statistical_test_bound=0.025, + eq_test_initial_reset_prob=0.25, + only_classical_equivalence_testing=False, + samples_cex_strategy=None, + output_dir="results", + save_files_for_each_round=False, + debug=False, +): + logging.info(f"min_rounds: {min_rounds}") + logging.info(f"max_rounds: {max_rounds}") + logging.info(f"smc_statistical_test_bound: {smc_statistical_test_bound}") + logging.info(f"eq_test_initial_reset_prob: {eq_test_initial_reset_prob}") + + eq_oracle = ProbBBReachOracle( + prism_model_path, + prism_adv_path, + prism_prop_path, + ltl_prop_path, + input_alphabet, + sul=sul, + smc_max_exec=smc_max_exec, + statistical_test_bound=smc_statistical_test_bound, + only_classical_equivalence_testing=only_classical_equivalence_testing, + num_steps=eq_num_steps, + initial_reset_prob=eq_test_initial_reset_prob, + reset_after_cex=True, + output_dir=output_dir, + save_files_for_each_round=save_files_for_each_round, + debug=debug, + ) # EQOracleChain print_level = 2 if debug: print_level = 3 - learned_mdp = run_stochastic_Lstar(input_alphabet=input_alphabet, eq_oracle=eq_oracle, sul=sul, n_c=n_c, - n_resample=n_resample, min_rounds=min_rounds, max_rounds=max_rounds, - automaton_type=automaton_type, strategy=strategy, cex_processing=cex_processing, - samples_cex_strategy=samples_cex_strategy, target_unambiguity=target_unambiguity, - property_based_stopping=stopping_based_on_prop, custom_oracle=True, - print_level=print_level) + learned_mdp = run_stochastic_Lstar( + input_alphabet=input_alphabet, + eq_oracle=eq_oracle, + sul=sul, + n_c=n_c, + n_resample=n_resample, + min_rounds=min_rounds, + max_rounds=max_rounds, + automaton_type=automaton_type, + strategy=strategy, + cex_processing=cex_processing, + samples_cex_strategy=samples_cex_strategy, + target_unambiguity=target_unambiguity, + property_based_stopping=stopping_based_on_prop, + custom_oracle=True, + print_level=print_level, + ) learned_strategy = eq_oracle.learned_strategy if learned_strategy is None: - logging.info('No strategy is learned') + logging.info("No strategy is learned") else: - sb = StrategyBridge(prism_adv_path, eq_oracle.exportstates_path, eq_oracle.exporttrans_path, - eq_oracle.exportlabels_path) - smc: StatisticalModelChecker = StatisticalModelChecker(sul, sb, ltl_prop_path, 0, None, num_exec=5000, - returnCEX=False) + sb = StrategyBridge( + prism_adv_path, + eq_oracle.exportstates_path, + eq_oracle.exporttrans_path, + eq_oracle.exportlabels_path, + ) + smc: StatisticalModelChecker = StatisticalModelChecker( + sul, sb, ltl_prop_path, 0, None, num_exec=5000, returnCEX=False + ) smc.run() logging.info( - f'SUT value by final SMC with {smc.num_exec} executions: {smc.exec_count_satisfication / smc.num_exec}') + f"SUT value by final SMC with {smc.num_exec} executions: {smc.exec_count_satisfication / smc.num_exec}" + ) return learned_mdp, learned_strategy diff --git a/src/Smc.py b/src/Smc.py index 15d87f9..d9bc68c 100644 --- a/src/Smc.py +++ b/src/Smc.py @@ -3,7 +3,9 @@ import numpy as np from scipy import stats from typing import Tuple, List -from aalpy.learning_algs.stochastic.SamplingBasedObservationTable import SamplingBasedObservationTable +from aalpy.learning_algs.stochastic.SamplingBasedObservationTable import ( + SamplingBasedObservationTable, +) from aalpy.base import SUL import spot @@ -13,15 +15,25 @@ class StatisticalModelChecker: - def __init__(self, mdp_sut : SUL, strategy_bridge : StrategyBridge, spec_path, sut_value, observation_table, num_exec=1000, max_exec_len=40, returnCEX=False): - self.log = logging.getLogger('StatisticalModelChecker') + def __init__( + self, + mdp_sut: SUL, + strategy_bridge: StrategyBridge, + spec_path, + sut_value, + observation_table, + num_exec=1000, + max_exec_len=40, + returnCEX=False, + ): + self.log = logging.getLogger("StatisticalModelChecker") self.sut = mdp_sut self.strategy_bridge = strategy_bridge self.sut_value = sut_value - self.observation_table : SamplingBasedObservationTable = observation_table + self.observation_table: SamplingBasedObservationTable = observation_table with open(spec_path) as f: spec = f.readline() - self.spec_monitor = spot.translate(spec, 'monitor', 'det') + self.spec_monitor = spot.translate(spec, "monitor", "det") self.bdict = self.spec_monitor.get_dict() self.spec_monitor_out = [] for s in range(0, self.spec_monitor.num_states()): @@ -29,7 +41,10 @@ def __init__(self, mdp_sut : SUL, strategy_bridge : StrategyBridge, spec_path, s self.ap_varnum = dict() for ap in self.spec_monitor.ap(): var = self.bdict.varnum(ap) - self.ap_varnum[ap.to_str()] = (buddy.bdd_ithvar(var), buddy.bdd_nithvar(var)) + self.ap_varnum[ap.to_str()] = ( + buddy.bdd_ithvar(var), + buddy.bdd_nithvar(var), + ) self.num_exec = num_exec self.max_exec_len = max_exec_len self.returnCEX = returnCEX @@ -63,24 +78,31 @@ def run(self): self.observation_table.update_obs_table_with_freq_obs() # Closedness, Consistencyの判定 row_to_close = self.observation_table.get_row_to_close() - consistency_violation = self.observation_table.get_consistency_violation() + consistency_violation = ( + self.observation_table.get_consistency_violation() + ) # Observation tableがclosedかつconsistentでなくなったときはSMCを早期終了 if row_to_close or consistency_violation: return -1 if (k + 1) % 1000 == 0: - self.log.info(f'SUT executed {k} times') + self.log.info(f"SUT executed {k} times") return None def hypothesis_testing(self, mean, alternative): - sample = np.concatenate((np.zeros(self.exec_count_violation), np.ones(self.exec_count_satisfication))) + sample = np.concatenate( + ( + np.zeros(self.exec_count_violation), + np.ones(self.exec_count_satisfication), + ) + ) return stats.ttest_1samp(sample, mean, alternative=alternative) def reset_sut(self): self.number_of_steps = 0 self.current_output = self.sut.pre() - self.current_output_aps = self.current_output.split('__') + self.current_output_aps = self.current_output.split("__") self.strategy_bridge.reset() self.exec_trace = [] self.monitor_current_state = self.spec_monitor.get_init_state_number() @@ -91,7 +113,7 @@ def one_step(self): # strategyから次のアクションを決め、SULを実行する action = self.strategy_bridge.next_action() self.current_output = self.sut.step(action) - self.current_output_aps = self.current_output.split('__') + self.current_output_aps = self.current_output.split("__") # 実行列を保存 self.exec_trace.append(action) self.exec_trace.append(self.current_output) @@ -108,7 +130,7 @@ def post_sut(self): # 出力outputにより、モニターの状態遷移を行う。 # 返り値はモニターの状態遷移が行われたか否かと条件が常に成立する状態に到達したか否か。モニターの状態遷移が行えないことは、仕様の違反を意味する。 - def step_monitor(self, output_aps : List[str]) -> Tuple[bool, bool]: + def step_monitor(self, output_aps: List[str]) -> Tuple[bool, bool]: # モニターの遷移ラベルのガードと、システムの出力を比較する edges = self.spec_monitor_out[self.monitor_current_state] accept = False @@ -127,7 +149,7 @@ def step_monitor(self, output_aps : List[str]) -> Tuple[bool, bool]: # 出力outputと、モニターのedgeを受け取り、edgeの条件をoutputが満たしているか判定する # 返り値はペアで、一つ目は条件を満たしていてedgeで遷移できるならば遷移先の状態を返し、遷移できないならばNoneを返す # 二つ目はセルフループしか存在しない状態に到達したか否か(以降は条件が常に成立するか否か) - def guardCheck(self, output_aps : List[str], edge): + def guardCheck(self, output_aps: List[str], edge): cond = edge.cond # label = spot.bdd_format_formula(self.bdict, cond) # # print(f'output: {output}') @@ -137,9 +159,9 @@ def guardCheck(self, output_aps : List[str], edge): # 条件が常に成立 return (edge.dst, True) aps_bdd = buddy.bdd_support(cond) - aps = spot.bdd_format_formula(self.bdict, aps_bdd).split(' & ') + aps = spot.bdd_format_formula(self.bdict, aps_bdd).split(" & ") for ap in aps: - if (ap in output_aps): + if ap in output_aps: bdd_var = self.ap_varnum[ap][0] cond = buddy.bdd_restrict(cond, bdd_var) else: @@ -148,8 +170,7 @@ def guardCheck(self, output_aps : List[str], edge): # restricted_label = spot.bdd_format_formula(self.bdict, cond) # print(f'cond : {restricted_label}') ret = buddy.bdd_satcount(cond) - if (ret > 0): + if ret > 0: return (edge.dst, False) else: return (None, False) - diff --git a/src/StrategyBridge.py b/src/StrategyBridge.py index 5ddabd3..cc67936 100644 --- a/src/StrategyBridge.py +++ b/src/StrategyBridge.py @@ -6,20 +6,24 @@ # Stateは多分本当はint State = int Action = str -Observation = str # モデルの出力ラベルの集合 +Observation = str # モデルの出力ラベルの集合 + # strategyと実システム(MDP)の組み合わせ # (Σin × Σout)* → Dist(Σin) という型のplayer側の戦略を作る class StrategyBridge: - def __init__(self, strategy_path, states_path, trans_path, labels_path): self.initial_state = 0 # 状態sにいる確率が current_state[s] self.current_state: Dict[State, float] = dict() - self.strategy: Dict[State, Action] = dict()# adv.traから得られたもの - self.observation_map: Dict[State, Observation] = dict() # .prismから得られたもの + self.strategy: Dict[State, Action] = dict() # adv.traから得られたもの + self.observation_map: Dict[State, Observation] = ( + dict() + ) # .prismから得られたもの # 本当はこんなに複雑じゃなくて良いかも - self.next_state: Dict[Tuple[State, Action, Observation], Dict[State, float]] = dict() # adv.traから得られたもの + self.next_state: Dict[Tuple[State, Action, Observation], Dict[State, float]] = ( + dict() + ) # adv.traから得られたもの # self.history : List[Tuple[Action, Observation]] = [] self.__init_state_and_observation(labels_path) @@ -27,11 +31,11 @@ def __init__(self, strategy_path, states_path, trans_path, labels_path): self.states = set(self.strategy.keys()) self.actions = set(self.strategy.values()) self.current_state = {self.initial_state: 1.0} - self.empty_dist : Dict[Action, float] = dict.fromkeys(self.actions, 0.0) - self.actions_list : List[Action] = list(self.empty_dist.keys()) + self.empty_dist: Dict[Action, float] = dict.fromkeys(self.actions, 0.0) + self.actions_list: List[Action] = list(self.empty_dist.keys()) def next_action(self) -> Action: - dist : Dict[Action, float] = self.empty_dist.copy() + dist: Dict[Action, float] = self.empty_dist.copy() is_empty_dist = True for state, weight in self.current_state.items(): # self.strategy[state] : strategyでstateに対応づけられているアクション @@ -41,7 +45,7 @@ def next_action(self) -> Action: # actionがsampleされる確率がdist[action]というサンプリング prob_dist = list(dist.values()) - action : Action = "" + action: Action = "" if is_empty_dist: action = random.choice(self.actions_list) else: @@ -73,7 +77,7 @@ def update_state(self, action: Action, observation_aps: List[str]) -> bool: else: # TODO: found counterexample? # if weight > 0: - # print("update_state(found_counter example?)" + str(state) + "," + action + "," + observation) + # print("update_state(found_counter example?)" + str(state) + "," + action + "," + observation) pass # new_stateの正規化 prob_sum = sum(new_state.values()) @@ -120,19 +124,22 @@ def __init_state_and_observation(self, labels_path): if label_idx in labels_name: label_name = labels_name[label_idx] else: - label_name = 'unknownobservation' + label_name = "unknownobservation" if int(state) in self.observation_map: self.observation_map[int(state)].append(label_name) else: self.observation_map[int(state)] = [label_name] for k in self.observation_map.keys(): - self.observation_map[k] = StrategyBridge.__sort_observation(self.observation_map[k]) - + self.observation_map[k] = StrategyBridge.__sort_observation( + self.observation_map[k] + ) # PRISMの反例ファイルを読み込んで strategyとnext_stateを初期化する def __init_strategy(self, strategy_path, trans_path): regex = re.compile(r"(\d+) \d\.?\d* (\d+) (\d\.?\d*) (\w+)") - next_state_temp : Dict[Tuple[State, Action, Observation], Dict[State, float]] = dict() + next_state_temp: Dict[Tuple[State, Action, Observation], Dict[State, float]] = ( + dict() + ) with open(strategy_path) as f: for line in f: match = regex.match(line) @@ -165,11 +172,11 @@ def __init_strategy(self, strategy_path, trans_path): self.next_state = dict() for k, prob_map in next_state_temp.items(): prob_sum = sum(prob_map.values()) - dist : Dict[State, float] = dict() + dist: Dict[State, float] = dict() for s, prob in prob_map.items(): dist[s] = prob / prob_sum self.next_state[k] = dist # observation_aps (APの集合) をAPの辞書順で整列させ、"__"で連結した文字列として返す - def __sort_observation(observation_aps : List[str]) -> Observation: + def __sort_observation(observation_aps: List[str]) -> Observation: return "__".join(sorted(observation_aps)) diff --git a/src/eval_each_round.py b/src/eval_each_round.py index bc8413b..58cd29c 100644 --- a/src/eval_each_round.py +++ b/src/eval_each_round.py @@ -10,12 +10,24 @@ def initialize_argparse(): parser = argparse.ArgumentParser() - parser.add_argument("--rounds-log-dir", dest="rounds_log_dir", help="path to log directory.", required=True) - parser.add_argument("--model-path", dest="model_path", help="path to input model", required=True) - parser.add_argument("--prop-path", dest="prop_path", help="path to property file", required=True) + parser.add_argument( + "--rounds-log-dir", + dest="rounds_log_dir", + help="path to log directory.", + required=True, + ) + parser.add_argument( + "--model-path", dest="model_path", help="path to input model", required=True + ) + parser.add_argument( + "--prop-path", dest="prop_path", help="path to property file", required=True + ) return parser + finally_regex = re.compile(r"F[ ]*\[[0-9 ]+,([0-9 ]+)\].*") + + def prop_max_step(prop_path): ret = 0 with open(prop_path) as f: @@ -29,6 +41,7 @@ def prop_max_step(prop_path): return 30 return ret + 2 + def main(): parser = initialize_argparse() args = parser.parse_args() @@ -38,12 +51,12 @@ def main(): exporttrans_file_name = "mc_exp.prism.tra" exportlabels_file_name = "mc_exp.prism.lab" - mdp = load_automaton_from_file(args.model_path, automaton_type='mdp') + mdp = load_automaton_from_file(args.model_path, automaton_type="mdp") # visualize_automaton(mdp) sul = MdpSUL(mdp) max_exec_len = prop_max_step(args.prop_path) - print(f'Property max exec length : {max_exec_len}') + print(f"Property max exec length : {max_exec_len}") for file in os.listdir(args.rounds_log_dir): d = os.path.join(args.rounds_log_dir, file) @@ -53,15 +66,36 @@ def main(): exportstates_path = os.path.join(d, exportstates_file_name) exporttrans_path = os.path.join(d, exporttrans_file_name) exportlabels_path = os.path.join(d, exportlabels_file_name) - if os.path.exists(adv_path) and os.path.exists(exportstates_path) and os.path.exists(exporttrans_path) and os.path.exists(exportlabels_path): - sb = StrategyBridge(adv_path, exportstates_path, exporttrans_path, exportlabels_path) - smc : StatisticalModelChecker = StatisticalModelChecker(sul, sb, args.prop_path, 0, None, num_exec=5000, max_exec_len=max_exec_len, returnCEX=False) + if ( + os.path.exists(adv_path) + and os.path.exists(exportstates_path) + and os.path.exists(exporttrans_path) + and os.path.exists(exportlabels_path) + ): + sb = StrategyBridge( + adv_path, exportstates_path, exporttrans_path, exportlabels_path + ) + smc: StatisticalModelChecker = StatisticalModelChecker( + sul, + sb, + args.prop_path, + 0, + None, + num_exec=5000, + max_exec_len=max_exec_len, + returnCEX=False, + ) smc.run() - print(f'SUT value by SMC at {d}: {smc.exec_count_satisfication / smc.num_exec} (satisfication: {smc.exec_count_satisfication}, total: {smc.num_exec})') + print( + f"SUT value by SMC at {d}: {smc.exec_count_satisfication / smc.num_exec} (satisfication: {smc.exec_count_satisfication}, total: {smc.num_exec})" + ) smc_log_path = os.path.join(d, "smc.log") - with open(smc_log_path, 'w+') as f: - f.write(f'{smc.exec_count_satisfication / smc.num_exec}\n{smc.exec_count_satisfication}\n{smc.num_exec}') + with open(smc_log_path, "w+") as f: + f.write( + f"{smc.exec_count_satisfication / smc.num_exec}\n{smc.exec_count_satisfication}\n{smc.num_exec}" + ) print("Finish evaluation of each round") + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/main.py b/src/main.py index 69e822d..2b65cdc 100644 --- a/src/main.py +++ b/src/main.py @@ -9,23 +9,103 @@ def initialize_argparse(): parser = argparse.ArgumentParser() - parser.add_argument("--model-file", dest="model_file", help="path to input dot model", required=True) - parser.add_argument("--prop-file", dest="prop_file", help="path to property file", required=True) - parser.add_argument("--prism-path", dest="prism_path", help="path to PRISM", required=True) - parser.add_argument("--output-dir", dest="output_dir", help="name of output directory (Default value = 'results')", default="results") - parser.add_argument("--save-files-for-each-round", dest="save_files_for_each_round", action="store_true", help="save files(model, hypothesis, strategy) for each rounds") - parser.add_argument("--min-rounds", dest="min_rounds", type=int, help="minimum number of learning rounds of L*mdp (Default value = 20)", default=20) - parser.add_argument("--max-rounds", dest="max_rounds", type=int, help="if learning_rounds >= max_rounds, L*mdp learning will stop (Default value = 240)", default=240) - parser.add_argument("--l-star-mdp-strategy", dest="l_star_mdp_strategy", help="either one of ['classic', 'normal', 'chi2'] or a object implementing DifferenceChecker class,\ndefault value is 'normal'. Classic strategy is the one presented\nin the seed paper, 'normal' is the updated version and chi2 is based on chi squared.", default="normal") - parser.add_argument("--n-cutoff", dest="n_c", type=int, help="cutoff for a cell to be considered complete (Default value = 20), only used with 'classic' strategy", default=20) - parser.add_argument("--n-resample", dest="n_resample", type=int, help="resampling size (Default value = 100), only used with 'classic' L*mdp strategy", default=100) - parser.add_argument("--target-unambiguity", dest="target_unambiguity", type=float, help="target unambiguity value of L*mdp (default 0.99)", default=0.99) - parser.add_argument("--eq-num-steps", dest="eq_num_steps", type=int, help="number of steps to be preformed by equivalence oracle", default=2000) - parser.add_argument("--smc-max-exec", dest="smc_max_exec", type=int, help="max number of executions by SMC (default=5000)", default=5000) - parser.add_argument("--only-classical-equivalence-testing", dest="only_classical_equivalence_testing", - help="Skip the strategy guided equivalence testing using SMC", action='store_true') - parser.add_argument("--smc-statistical-test-bound", dest="smc_statistical_test_bound", type=float, help="statistical test bound of difference check between SMC and model-checking (default 0.025)", default=0.025) - parser.add_argument("-v", "--verbose", "--debug", dest="debug", action="store_true", help="output debug messages") + parser.add_argument( + "--model-file", dest="model_file", help="path to input dot model", required=True + ) + parser.add_argument( + "--prop-file", dest="prop_file", help="path to property file", required=True + ) + parser.add_argument( + "--prism-path", dest="prism_path", help="path to PRISM", required=True + ) + parser.add_argument( + "--output-dir", + dest="output_dir", + help="name of output directory (Default value = 'results')", + default="results", + ) + parser.add_argument( + "--save-files-for-each-round", + dest="save_files_for_each_round", + action="store_true", + help="save files(model, hypothesis, strategy) for each rounds", + ) + parser.add_argument( + "--min-rounds", + dest="min_rounds", + type=int, + help="minimum number of learning rounds of L*mdp (Default value = 20)", + default=20, + ) + parser.add_argument( + "--max-rounds", + dest="max_rounds", + type=int, + help="if learning_rounds >= max_rounds, L*mdp learning will stop (Default value = 240)", + default=240, + ) + parser.add_argument( + "--l-star-mdp-strategy", + dest="l_star_mdp_strategy", + help="either one of ['classic', 'normal', 'chi2'] or a object implementing DifferenceChecker class,\ndefault value is 'normal'. Classic strategy is the one presented\nin the seed paper, 'normal' is the updated version and chi2 is based on chi squared.", + default="normal", + ) + parser.add_argument( + "--n-cutoff", + dest="n_c", + type=int, + help="cutoff for a cell to be considered complete (Default value = 20), only used with 'classic' strategy", + default=20, + ) + parser.add_argument( + "--n-resample", + dest="n_resample", + type=int, + help="resampling size (Default value = 100), only used with 'classic' L*mdp strategy", + default=100, + ) + parser.add_argument( + "--target-unambiguity", + dest="target_unambiguity", + type=float, + help="target unambiguity value of L*mdp (default 0.99)", + default=0.99, + ) + parser.add_argument( + "--eq-num-steps", + dest="eq_num_steps", + type=int, + help="number of steps to be preformed by equivalence oracle", + default=2000, + ) + parser.add_argument( + "--smc-max-exec", + dest="smc_max_exec", + type=int, + help="max number of executions by SMC (default=5000)", + default=5000, + ) + parser.add_argument( + "--only-classical-equivalence-testing", + dest="only_classical_equivalence_testing", + help="Skip the strategy guided equivalence testing using SMC", + action="store_true", + ) + parser.add_argument( + "--smc-statistical-test-bound", + dest="smc_statistical_test_bound", + type=float, + help="statistical test bound of difference check between SMC and model-checking (default 0.025)", + default=0.025, + ) + parser.add_argument( + "-v", + "--verbose", + "--debug", + dest="debug", + action="store_true", + help="output debug messages", + ) return parser @@ -33,32 +113,47 @@ def initialize_argparse(): def main(): parser = initialize_argparse() args = parser.parse_args() - logging.basicConfig(format='%(asctime)s %(module)s[%(lineno)d] [%(levelname)s]: %(message)s', - stream=sys.stdout, - level=logging.INFO if not args.debug else logging.DEBUG) + logging.basicConfig( + format="%(asctime)s %(module)s[%(lineno)d] [%(levelname)s]: %(message)s", + stream=sys.stdout, + level=logging.INFO if not args.debug else logging.DEBUG, + ) aalpy.paths.path_to_prism = args.prism_path output_dir = abspath(args.output_dir) os.makedirs(output_dir, exist_ok=True) mdp_model_path = args.model_file - prism_model_path = f'{output_dir}/mc_exp.prism' - prism_adv_path = f'{output_dir}/adv.tra' + prism_model_path = f"{output_dir}/mc_exp.prism" + prism_adv_path = f"{output_dir}/adv.tra" prop_file = args.prop_file prism_prop_path = prop_file prop_file_name, _ = os.path.splitext(prop_file) - ltl_prop_path = f'{prop_file_name}.ltl' + ltl_prop_path = f"{prop_file_name}.ltl" - learned_mdp, strategy = learn_mdp_and_strategy(mdp_model_path, prism_model_path, prism_adv_path, prism_prop_path, ltl_prop_path, - output_dir=output_dir, save_files_for_each_round=args.save_files_for_each_round, - min_rounds=args.min_rounds, max_rounds=args.max_rounds, strategy=args.l_star_mdp_strategy, n_c=args.n_c, n_resample=args.n_resample, - target_unambiguity=args.target_unambiguity, eq_num_steps=args.eq_num_steps, smc_max_exec=args.smc_max_exec, + learned_mdp, strategy = learn_mdp_and_strategy( + mdp_model_path, + prism_model_path, + prism_adv_path, + prism_prop_path, + ltl_prop_path, + output_dir=output_dir, + save_files_for_each_round=args.save_files_for_each_round, + min_rounds=args.min_rounds, + max_rounds=args.max_rounds, + strategy=args.l_star_mdp_strategy, + n_c=args.n_c, + n_resample=args.n_resample, + target_unambiguity=args.target_unambiguity, + eq_num_steps=args.eq_num_steps, + smc_max_exec=args.smc_max_exec, only_classical_equivalence_testing=args.only_classical_equivalence_testing, - smc_statistical_test_bound=args.smc_statistical_test_bound, debug=args.debug) + smc_statistical_test_bound=args.smc_statistical_test_bound, + debug=args.debug, + ) print("Finish prob bbc") if __name__ == "__main__": main() - diff --git a/src/prism_export_to_dot_model.py b/src/prism_export_to_dot_model.py index 192b076..db2d64b 100644 --- a/src/prism_export_to_dot_model.py +++ b/src/prism_export_to_dot_model.py @@ -1,7 +1,8 @@ import sys import re -except_names = ['init', 'notEnd'] +except_names = ["init", "notEnd"] + def main(): if len(sys.argv) < 1: @@ -10,7 +11,7 @@ def main(): file_path = sys.argv[1] states = {} - states_regex = re.compile('(\d+):(.*)') + states_regex = re.compile("(\d+):(.*)") with open(file_path + ".sta", "r") as f_states: for line in f_states: m = states_regex.match(line) @@ -18,7 +19,7 @@ def main(): states[m[1]] = m[2].strip() transitions = [] - trans_regex = re.compile('(\d+) (\d+) (\d+) (\d+\.?\d*) (.*)') + trans_regex = re.compile("(\d+) (\d+) (\d+) (\d+\.?\d*) (.*)") with open(file_path + ".tra", "r") as f_trans: for line in f_trans: m = trans_regex.match(line) @@ -30,14 +31,14 @@ def main(): initial_state = None label_name_line_regex = re.compile('(\d+="\w+" )*(\d+="\w+")') label_name_regex = re.compile('(\d+)="(\w+)"') - label_regex = re.compile('(\d+): (.*)') - label_index_regex = re.compile('\d+') + label_regex = re.compile("(\d+): (.*)") + label_index_regex = re.compile("\d+") with open(file_path + ".lab", "r") as f_labels: for line in f_labels: m = label_name_line_regex.match(line) if m: m_names = label_name_regex.findall(line) - for (index, name) in m_names: + for index, name in m_names: name_of_labels[index] = name else: m = label_regex.match(line) @@ -62,10 +63,19 @@ def main(): for state_index in states: f.write(state_index) f.write(' [shape="circle" label="' + labels[state_index] + '"];\n') - for (current_sta, _, next_sta, probability, action) in transitions: - f.write(current_sta + ' -> ' + next_sta + ' [label="' + action + ':' + probability + '"];\n') - f.write('__start0 -> 0;\n') - f.write('}\n') + for current_sta, _, next_sta, probability, action in transitions: + f.write( + current_sta + + " -> " + + next_sta + + ' [label="' + + action + + ":" + + probability + + '"];\n' + ) + f.write("__start0 -> 0;\n") + f.write("}\n") if __name__ == "__main__": diff --git a/src/tests/Smc_test.py b/src/tests/Smc_test.py index eb0925e..e1b43da 100644 --- a/src/tests/Smc_test.py +++ b/src/tests/Smc_test.py @@ -6,15 +6,16 @@ from aalpy.SULs import MdpSUL from aalpy.utils import load_automaton_from_file + def custom_print(aut): bdict = aut.get_dict() print("Acceptance:", aut.get_acceptance()) print("Number of sets:", aut.num_sets()) print("Number of states: ", aut.num_states()) print("Initial states: ", aut.get_init_state_number()) - print("Atomic propositions:", end='') + print("Atomic propositions:", end="") for ap in aut.ap(): - print(' ', ap, ' (=', bdict.varnum(ap), ')', sep='', end='') + print(" ", ap, " (=", bdict.varnum(ap), ")", sep="", end="") print() # Templated methods are not available in Python, so we cannot # retrieve/attach arbitrary objects from/to the automaton. However the @@ -41,16 +42,20 @@ def custom_print(aut): print(" label =", spot.bdd_format_formula(bdict, t.cond)) print(" acc sets =", t.acc) + class SMCTestCase(unittest.TestCase): def test_initialize(self): - example = 'shared_coin' - mdp = load_automaton_from_file(f'/Users/bo40/workspace/python/AALpy/DotModels/MDPs/{example}.dot', automaton_type='mdp') + example = "shared_coin" + mdp = load_automaton_from_file( + f"/Users/bo40/workspace/python/AALpy/DotModels/MDPs/{example}.dot", + automaton_type="mdp", + ) sul = MdpSUL(mdp) - sample_prism_model = f'/Users/bo40/workspace/python/mc_exp_sample.prism' - sample_prism_adv = f'/Users/bo40/workspace/python/adv_sample.tra' + sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) - spec_path = f'/Users/bo40/workspace/python/sandbox/shared_coin.ltl' + spec_path = f"/Users/bo40/workspace/python/sandbox/shared_coin.ltl" smc = StatisticalModelChecker(sul, sb, spec_path) custom_print(smc.spec_monitor) @@ -66,24 +71,26 @@ def test_initialize(self): 0 def test_smc_run(self): - example = 'shared_coin' - mdp = load_automaton_from_file(f'/Users/bo40/workspace/python/AALpy/DotModels/MDPs/{example}.dot', automaton_type='mdp') + example = "shared_coin" + mdp = load_automaton_from_file( + f"/Users/bo40/workspace/python/AALpy/DotModels/MDPs/{example}.dot", + automaton_type="mdp", + ) sul = MdpSUL(mdp) - sample_prism_model = f'/Users/bo40/workspace/python/mc_exp_sample.prism' - sample_prism_adv = f'/Users/bo40/workspace/python/adv_sample.tra' + sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) - spec_path = f'/Users/bo40/workspace/python/sandbox/shared_coin.ltl' + spec_path = f"/Users/bo40/workspace/python/sandbox/shared_coin.ltl" smc = StatisticalModelChecker(sul, sb, spec_path, num_exec=5000) smc.run() - print(f'sat : {smc.exec_count_satisfication}') - print(f'vio : {smc.exec_count_violation}') - ret = smc.hypothesis_testing(0.29, 'two-sided') - print(f'pvalue : {ret.pvalue}') + print(f"sat : {smc.exec_count_satisfication}") + print(f"vio : {smc.exec_count_violation}") + ret = smc.hypothesis_testing(0.29, "two-sided") + print(f"pvalue : {ret.pvalue}") 0 - -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main() diff --git a/src/tests/StrategyBridge_test.py b/src/tests/StrategyBridge_test.py index e2fa458..97e874f 100644 --- a/src/tests/StrategyBridge_test.py +++ b/src/tests/StrategyBridge_test.py @@ -1,68 +1,70 @@ import unittest from ..StrategyBridge import StrategyBridge -class StrategyBridgeTestCase(unittest.TestCase): - def test_initialize(self): - sample_prism_model = f'/Users/bo40/workspace/python/mc_exp_sample.prism' - sample_prism_adv = f'/Users/bo40/workspace/python/adv_sample.tra' - sb = StrategyBridge(sample_prism_adv, sample_prism_model) - self.assertEqual(sb.initial_state, 0) - self.assertEqual(sb.observation_map[0], "____start") - self.assertEqual(sb.observation_map[5], 'c1_heads__c2_tails__eight') - self.assertEqual(sb.observation_map[10], 'c1_tails__c2_heads__eight') - self.assertEqual(sb.observation_map[15], 'agree__c1_tails__c2_tails__six') - self.assertEqual(sb.observation_map[20], 'agree__c1_tails__c2_tails__seven') - self.assertEqual(sb.strategy[0], "go2") - self.assertEqual(sb.strategy[4], "go2") - self.assertEqual(sb.strategy[12], "go2") - self.assertEqual(sb.strategy[16], "go2") - self.assertEqual(sb.strategy[20], "go2") - self.assertEqual(sb.strategy[25], "go1") - self.assertEqual(sb.next_state[(0, 'go2', 'agree__c1_tails__c2_tails__six')], {42: 1.0}) +class StrategyBridgeTestCase(unittest.TestCase): + def test_initialize(self): + sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sb = StrategyBridge(sample_prism_adv, sample_prism_model) + self.assertEqual(sb.initial_state, 0) + self.assertEqual(sb.observation_map[0], "____start") + self.assertEqual(sb.observation_map[5], "c1_heads__c2_tails__eight") + self.assertEqual(sb.observation_map[10], "c1_tails__c2_heads__eight") + self.assertEqual(sb.observation_map[15], "agree__c1_tails__c2_tails__six") + self.assertEqual(sb.observation_map[20], "agree__c1_tails__c2_tails__seven") + self.assertEqual(sb.strategy[0], "go2") + self.assertEqual(sb.strategy[4], "go2") + self.assertEqual(sb.strategy[12], "go2") + self.assertEqual(sb.strategy[16], "go2") + self.assertEqual(sb.strategy[20], "go2") + self.assertEqual(sb.strategy[25], "go1") + self.assertEqual( + sb.next_state[(0, "go2", "agree__c1_tails__c2_tails__six")], {42: 1.0} + ) - def test_next(self): - sample_prism_model = f'/Users/bo40/workspace/python/mc_exp_sample.prism' - sample_prism_adv = f'/Users/bo40/workspace/python/adv_sample.tra' - sb = StrategyBridge(sample_prism_adv, sample_prism_model) + def test_next(self): + sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sb = StrategyBridge(sample_prism_adv, sample_prism_model) - self.assertEqual(sb.next_action(), "go2") - sb.update_state("go2", 'agree__c1_tails__c2_tails__six') - self.assertEqual(sb.current_state[42], 1.0) - self.assertEqual(sb.next_action(), "go2") - sb.update_state("go2", 'agree__c1_tails__c2_tails__five') - self.assertEqual(sb.current_state[47], 1.0) - # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six'), ("go2", 'agree__c1_tails__c2_tails__five')]) + self.assertEqual(sb.next_action(), "go2") + sb.update_state("go2", "agree__c1_tails__c2_tails__six") + self.assertEqual(sb.current_state[42], 1.0) + self.assertEqual(sb.next_action(), "go2") + sb.update_state("go2", "agree__c1_tails__c2_tails__five") + self.assertEqual(sb.current_state[47], 1.0) + # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six'), ("go2", 'agree__c1_tails__c2_tails__five')]) - def test_reset(self): - sample_prism_model = f'/Users/bo40/workspace/python/mc_exp_sample.prism' - sample_prism_adv = f'/Users/bo40/workspace/python/adv_sample.tra' - sb = StrategyBridge(sample_prism_adv, sample_prism_model) + def test_reset(self): + sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sb = StrategyBridge(sample_prism_adv, sample_prism_model) - sb.update_state("go2", 'agree__c1_tails__c2_tails__six') - self.assertEqual(sb.current_state[42], 1.0) - # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six')]) + sb.update_state("go2", "agree__c1_tails__c2_tails__six") + self.assertEqual(sb.current_state[42], 1.0) + # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six')]) - sb.reset() + sb.reset() - self.assertEqual(sb.current_state[42], 0) - # self.assertEqual(sb.history, []) + self.assertEqual(sb.current_state[42], 0) + # self.assertEqual(sb.history, []) - def test_update_state_false(self): - sample_prism_model = f'/Users/bo40/workspace/python/mc_exp_sample.prism' - sample_prism_adv = f'/Users/bo40/workspace/python/adv_sample.tra' - sb = StrategyBridge(sample_prism_adv, sample_prism_model) + def test_update_state_false(self): + sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sb = StrategyBridge(sample_prism_adv, sample_prism_model) - # TODO : implement this test - self.assertEqual(sb.next_action(), "go2") - sb.update_state("go2", 'agree__c1_tails__c2_tails__six') - self.assertEqual(sb.current_state[42], 1.0) - self.assertEqual(sb.next_action(), "go2") - sb.update_state("go2", 'agree__c1_tails__c2_tails__five') - self.assertEqual(sb.current_state[47], 1.0) - # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six'), ("go2", 'agree__c1_tails__c2_tails__five')]) + # TODO : implement this test + self.assertEqual(sb.next_action(), "go2") + sb.update_state("go2", "agree__c1_tails__c2_tails__six") + self.assertEqual(sb.current_state[42], 1.0) + self.assertEqual(sb.next_action(), "go2") + sb.update_state("go2", "agree__c1_tails__c2_tails__five") + self.assertEqual(sb.current_state[47], 1.0) + # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six'), ("go2", 'agree__c1_tails__c2_tails__five')]) -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main() diff --git a/stat/scripts/plot_graph.py b/stat/scripts/plot_graph.py index 99ac9a1..4629af5 100644 --- a/stat/scripts/plot_graph.py +++ b/stat/scripts/plot_graph.py @@ -14,75 +14,102 @@ output_file = sys.argv[3] true_value = float(sys.argv[4]) if len(sys.argv) > 5: - xlimit = int(sys.argv[5]) + xlimit = int(sys.argv[5]) else: - xlimit = 0 + xlimit = 0 if len(sys.argv) > 6: - xtarget = int(sys.argv[6]) + xtarget = int(sys.argv[6]) else: - xtarget = None + xtarget = None if len(sys.argv) > 7: - uniform_smc_file = sys.argv[7] + uniform_smc_file = sys.argv[7] else: - uniform_smc_file = None + uniform_smc_file = None if len(sys.argv) > 9: - ymin = float(sys.argv[8]) - ymax = float(sys.argv[9]) + ymin = float(sys.argv[8]) + ymax = float(sys.argv[9]) else: - ymin = None - ymax = None + ymin = None + ymax = None # stat file of our method # 0-round 1-probability 2-round' 3-#-of-queries 4-#-of-steps -probbbc_prob_data = np.loadtxt(stat_file, usecols=1, dtype='float') -probbbc_step_data = np.loadtxt(stat_file, usecols=4, dtype='int64') +probbbc_prob_data = np.loadtxt(stat_file, usecols=1, dtype="float") +probbbc_step_data = np.loadtxt(stat_file, usecols=4, dtype="int64") if len(probbbc_prob_data) != len(probbbc_step_data): - print(f'stat file of our method is broken : {stat_file}') - exit() + print(f"stat file of our method is broken : {stat_file}") + exit() # stat file of baseline method # 0-#-of-steps, 1-probability -baseline_prob_data = np.loadtxt(baseline_file, usecols=1, dtype='float') -baseline_step_data = np.loadtxt(baseline_file, usecols=0, dtype='int64') +baseline_prob_data = np.loadtxt(baseline_file, usecols=1, dtype="float") +baseline_step_data = np.loadtxt(baseline_file, usecols=0, dtype="int64") if len(baseline_prob_data) != len(baseline_step_data): - print(f'stat file of baseline method is broken : {baseline_file}') - exit() + print(f"stat file of baseline method is broken : {baseline_file}") + exit() -plt.rcParams['font.family'] = 'Times New Roman' -plt.rcParams['mathtext.fontset'] = 'stix' # math fontの設定 +plt.rcParams["font.family"] = "Times New Roman" +plt.rcParams["mathtext.fontset"] = "stix" # math fontの設定 plt.rcParams["font.size"] = 15 # plt.rcParams['figure.dpi'] = 300 -plt.rcParams['figure.figsize'] = (6.5, 3.5) +plt.rcParams["figure.figsize"] = (6.5, 3.5) fig = plt.figure() ax = fig.add_subplot(111) # plot data of baseline method -ax.plot(baseline_step_data, baseline_prob_data, color='#F92816', alpha=0.2, marker='o', markersize=0.5, linestyle='', label="baseline method") -label_baseline, = ax.plot([-1000_0000],[0], color='#F92816', marker='o', linestyle='', label="baseline method") +ax.plot( + baseline_step_data, + baseline_prob_data, + color="#F92816", + alpha=0.2, + marker="o", + markersize=0.5, + linestyle="", + label="baseline method", +) +(label_baseline,) = ax.plot( + [-1000_0000], + [0], + color="#F92816", + marker="o", + linestyle="", + label="baseline method", +) # plot data of our method # green : 0CC94B or 079C29, blue : 091DF4 or 040EC6 -ax.plot(probbbc_step_data, probbbc_prob_data, color='#091DF4', alpha=0.6, marker='o', markersize=0.5, linestyle='', label="our method") -label_probbbc, = ax.plot([-1000_0000],[0], color='#091DF4', marker='o', linestyle='', label="our method") +ax.plot( + probbbc_step_data, + probbbc_prob_data, + color="#091DF4", + alpha=0.6, + marker="o", + markersize=0.5, + linestyle="", + label="our method", +) +(label_probbbc,) = ax.plot( + [-1000_0000], [0], color="#091DF4", marker="o", linestyle="", label="our method" +) # plot true value -label_true = ax.axhline(true_value, ls = "-.", color='k') +label_true = ax.axhline(true_value, ls="-.", color="k") ax.set_xlabel(r"Number of executed steps on the SUT") ax.set_ylabel(r"Probability $p$") if xlimit > 0: - ax.set_xlim(0, xlimit) + ax.set_xlim(0, xlimit) else: - ax.set_xlim(0, max(probbbc_step_data.max(), baseline_step_data.max())) + ax.set_xlim(0, max(probbbc_step_data.max(), baseline_step_data.max())) if ymin and ymax: - ax.set_ylim(ymin, ymax) + ax.set_ylim(ymin, ymax) # else: # ax.set_ylim(-0.02) -ax.legend(handles=[label_probbbc, label_baseline]) # 凡例 +ax.legend(handles=[label_probbbc, label_baseline]) # 凡例 # Gaussian process regression # print(f'GPR : fiting our method (sample size : {len(probbbc_step_data)})') @@ -128,42 +155,45 @@ plt.clf() if xtarget: - # plt.rcParams['figure.dpi'] = 300 - plt.rcParams['figure.figsize'] = (3.5, 3.5) - # boxplot data of our method - xtarget_diff = 100000 - probbbc_data = np.loadtxt(stat_file, usecols=[1,4]) - # probbbc_data = probbbc_data[np.any((probbbc_data > xtarget - xtarget_diff) & (probbbc_data < xtarget + xtarget_diff), axis=1)] - index = np.argsort(np.abs(probbbc_data[:,1] - xtarget)) - probbbc_data = probbbc_data[index] - probbbc_data = probbbc_data[:,0] - probbbc_data = probbbc_data[:60] - print(f'boxplot data of our method has {len(probbbc_data)} data') - - # boxplot data of baseline method - baseline_data = np.loadtxt(baseline_file) - # baseline_data = baseline_data[np.any((baseline_data > xtarget - xtarget_diff) & (baseline_data < xtarget + xtarget_diff), axis=1)] - index = np.argsort(np.abs(baseline_data[:,0] - xtarget)) - baseline_data = baseline_data[index] - baseline_data = baseline_data[:,1] - baseline_data = baseline_data[:60] - print(f'boxplot data of baseline method has {len(baseline_data)} data') - - - fig = plt.figure() - ax = fig.add_subplot(111) - - if uniform_smc_file: - uniform_data = np.loadtxt(uniform_smc_file) - ax.boxplot([probbbc_data, baseline_data, uniform_data], widths=[.5,.5,.5]) - ax.set_xticklabels(['ProbBBC', 'baseline', 'uniform']) - else: - ax.boxplot([probbbc_data, baseline_data], widths=[.5,.5]) - ax.set_xticklabels(['our method', 'baseline method']) - ax.axhline(true_value, ls = '--') - # ax.set_ylim(0.29, 0.52) - plt.ylabel(r"Probability $p$") - - plt.show() - - fig.savefig(os.path.splitext(output_file)[0] + '_boxplot.eps', bbox_inches="tight", pad_inches=0.05) + # plt.rcParams['figure.dpi'] = 300 + plt.rcParams["figure.figsize"] = (3.5, 3.5) + # boxplot data of our method + xtarget_diff = 100000 + probbbc_data = np.loadtxt(stat_file, usecols=[1, 4]) + # probbbc_data = probbbc_data[np.any((probbbc_data > xtarget - xtarget_diff) & (probbbc_data < xtarget + xtarget_diff), axis=1)] + index = np.argsort(np.abs(probbbc_data[:, 1] - xtarget)) + probbbc_data = probbbc_data[index] + probbbc_data = probbbc_data[:, 0] + probbbc_data = probbbc_data[:60] + print(f"boxplot data of our method has {len(probbbc_data)} data") + + # boxplot data of baseline method + baseline_data = np.loadtxt(baseline_file) + # baseline_data = baseline_data[np.any((baseline_data > xtarget - xtarget_diff) & (baseline_data < xtarget + xtarget_diff), axis=1)] + index = np.argsort(np.abs(baseline_data[:, 0] - xtarget)) + baseline_data = baseline_data[index] + baseline_data = baseline_data[:, 1] + baseline_data = baseline_data[:60] + print(f"boxplot data of baseline method has {len(baseline_data)} data") + + fig = plt.figure() + ax = fig.add_subplot(111) + + if uniform_smc_file: + uniform_data = np.loadtxt(uniform_smc_file) + ax.boxplot([probbbc_data, baseline_data, uniform_data], widths=[0.5, 0.5, 0.5]) + ax.set_xticklabels(["ProbBBC", "baseline", "uniform"]) + else: + ax.boxplot([probbbc_data, baseline_data], widths=[0.5, 0.5]) + ax.set_xticklabels(["our method", "baseline method"]) + ax.axhline(true_value, ls="--") + # ax.set_ylim(0.29, 0.52) + plt.ylabel(r"Probability $p$") + + plt.show() + + fig.savefig( + os.path.splitext(output_file)[0] + "_boxplot.eps", + bbox_inches="tight", + pad_inches=0.05, + ) From 348144e316f2eafc58d667c27abdec7287217725 Mon Sep 17 00:00:00 2001 From: satos Date: Thu, 5 Sep 2024 19:07:47 +0900 Subject: [PATCH 3/4] apply ruff check --fix --- misc/MV_python_integrator.py | 7 +++---- misc/dfa_lean.py | 1 - misc/learn_then_model_check.py | 4 ---- misc/mdp_model_check.py | 1 - src/ProbBlackBoxChecking.py | 9 ++------- src/StrategyBridge.py | 3 +-- src/prism_export_to_dot_model.py | 2 +- src/tests/Smc_test.py | 12 ++++++------ src/tests/StrategyBridge_test.py | 16 ++++++++-------- stat/scripts/plot_graph.py | 4 ---- 10 files changed, 21 insertions(+), 38 deletions(-) diff --git a/misc/MV_python_integrator.py b/misc/MV_python_integrator.py index 3227329..98a5db7 100644 --- a/misc/MV_python_integrator.py +++ b/misc/MV_python_integrator.py @@ -1,12 +1,11 @@ # -*- coding: utf-8 -*- from abc import ABC -import sys, random +import sys import socket import pickle import copy from py4j.java_gateway import JavaGateway, GatewayParameters, CallbackServerParameters -from aalpy.base import Automaton, AutomatonState from aalpy.SULs import MdpSUL from aalpy.utils import load_automaton_from_file from StrategyBridge import StrategyBridge @@ -24,8 +23,8 @@ def __init__(self): ) self.sul = MdpSUL(mdp) # == デバッグ用 == - self.prism_model_path = f"/Users/bo40/workspace/python/mc_exp.prism" - self.prism_adv_path = f"/Users/bo40/workspace/python/adv.tra" + self.prism_model_path = "/Users/bo40/workspace/python/mc_exp.prism" + self.prism_adv_path = "/Users/bo40/workspace/python/adv.tra" # ==== self.strategy_bridge = StrategyBridge( self.prism_adv_path, self.prism_model_path diff --git a/misc/dfa_lean.py b/misc/dfa_lean.py index e10965a..93c6040 100644 --- a/misc/dfa_lean.py +++ b/misc/dfa_lean.py @@ -1,5 +1,4 @@ from aalpy.utils import ( - load_automaton_from_file, save_automaton_to_file, visualize_automaton, generate_random_dfa, diff --git a/misc/learn_then_model_check.py b/misc/learn_then_model_check.py index 7d4ecf2..ef72655 100644 --- a/misc/learn_then_model_check.py +++ b/misc/learn_then_model_check.py @@ -4,11 +4,7 @@ from aalpy.oracles import RandomWalkEqOracle, RandomWordEqOracle from aalpy.learning_algs import run_stochastic_Lstar from aalpy.utils import ( - visualize_automaton, load_automaton_from_file, - model_check_experiment, - get_properties_file, - get_correct_prop_values, mdp_2_prism_format, ) from aalpy.automata.StochasticMealyMachine import smm_to_mdp_conversion diff --git a/misc/mdp_model_check.py b/misc/mdp_model_check.py index 1280189..d4617c0 100644 --- a/misc/mdp_model_check.py +++ b/misc/mdp_model_check.py @@ -4,7 +4,6 @@ from aalpy.oracles import RandomWalkEqOracle, RandomWordEqOracle from aalpy.learning_algs import run_stochastic_Lstar from aalpy.utils import ( - visualize_automaton, load_automaton_from_file, model_check_experiment, get_properties_file, diff --git a/src/ProbBlackBoxChecking.py b/src/ProbBlackBoxChecking.py index fe8b293..d2e3eee 100644 --- a/src/ProbBlackBoxChecking.py +++ b/src/ProbBlackBoxChecking.py @@ -3,22 +3,17 @@ import collections import os import shutil -from sys import prefix -from typing import List import aalpy.paths -from aalpy.base import Oracle, SUL +from aalpy.base import SUL from aalpy.automata import StochasticMealyMachine from aalpy.SULs import MdpSUL -from aalpy.oracles import RandomWalkEqOracle, RandomWordEqOracle +from aalpy.oracles import RandomWalkEqOracle from aalpy.learning_algs import run_stochastic_Lstar from aalpy.utils import ( load_automaton_from_file, mdp_2_prism_format, - get_properties_file, - get_correct_prop_values, ) from aalpy.automata.StochasticMealyMachine import smm_to_mdp_conversion -from aalpy.utils.HelperFunctions import print_observation_table from Smc import StatisticalModelChecker from StrategyBridge import StrategyBridge diff --git a/src/StrategyBridge.py b/src/StrategyBridge.py index cc67936..cb262e9 100644 --- a/src/StrategyBridge.py +++ b/src/StrategyBridge.py @@ -1,7 +1,6 @@ import re import random -import numpy as np -from typing import Dict, Tuple, Set, List +from typing import Dict, Tuple, List # Stateは多分本当はint State = int diff --git a/src/prism_export_to_dot_model.py b/src/prism_export_to_dot_model.py index db2d64b..eb8c6d5 100644 --- a/src/prism_export_to_dot_model.py +++ b/src/prism_export_to_dot_model.py @@ -49,7 +49,7 @@ def main(): name = name_of_labels[idx] if name == "init": initial_state = m[1] - if not name in except_names: + if name not in except_names: label_names.append(name) labels[m[1]] = "__".join(label_names) diff --git a/src/tests/Smc_test.py b/src/tests/Smc_test.py index e1b43da..a67445b 100644 --- a/src/tests/Smc_test.py +++ b/src/tests/Smc_test.py @@ -51,11 +51,11 @@ def test_initialize(self): automaton_type="mdp", ) sul = MdpSUL(mdp) - sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" - sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sample_prism_model = "/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = "/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) - spec_path = f"/Users/bo40/workspace/python/sandbox/shared_coin.ltl" + spec_path = "/Users/bo40/workspace/python/sandbox/shared_coin.ltl" smc = StatisticalModelChecker(sul, sb, spec_path) custom_print(smc.spec_monitor) @@ -77,11 +77,11 @@ def test_smc_run(self): automaton_type="mdp", ) sul = MdpSUL(mdp) - sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" - sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sample_prism_model = "/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = "/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) - spec_path = f"/Users/bo40/workspace/python/sandbox/shared_coin.ltl" + spec_path = "/Users/bo40/workspace/python/sandbox/shared_coin.ltl" smc = StatisticalModelChecker(sul, sb, spec_path, num_exec=5000) smc.run() diff --git a/src/tests/StrategyBridge_test.py b/src/tests/StrategyBridge_test.py index 97e874f..0c84094 100644 --- a/src/tests/StrategyBridge_test.py +++ b/src/tests/StrategyBridge_test.py @@ -4,8 +4,8 @@ class StrategyBridgeTestCase(unittest.TestCase): def test_initialize(self): - sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" - sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sample_prism_model = "/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = "/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) self.assertEqual(sb.initial_state, 0) @@ -25,8 +25,8 @@ def test_initialize(self): ) def test_next(self): - sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" - sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sample_prism_model = "/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = "/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) self.assertEqual(sb.next_action(), "go2") @@ -38,8 +38,8 @@ def test_next(self): # self.assertEqual(sb.history, [("go2", 'agree__c1_tails__c2_tails__six'), ("go2", 'agree__c1_tails__c2_tails__five')]) def test_reset(self): - sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" - sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sample_prism_model = "/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = "/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) sb.update_state("go2", "agree__c1_tails__c2_tails__six") @@ -52,8 +52,8 @@ def test_reset(self): # self.assertEqual(sb.history, []) def test_update_state_false(self): - sample_prism_model = f"/Users/bo40/workspace/python/mc_exp_sample.prism" - sample_prism_adv = f"/Users/bo40/workspace/python/adv_sample.tra" + sample_prism_model = "/Users/bo40/workspace/python/mc_exp_sample.prism" + sample_prism_adv = "/Users/bo40/workspace/python/adv_sample.tra" sb = StrategyBridge(sample_prism_adv, sample_prism_model) # TODO : implement this test diff --git a/stat/scripts/plot_graph.py b/stat/scripts/plot_graph.py index 4629af5..cb95fd3 100644 --- a/stat/scripts/plot_graph.py +++ b/stat/scripts/plot_graph.py @@ -1,13 +1,9 @@ import sys import os -import math import numpy as np -import matplotlib from matplotlib import pyplot as plt # Gaussian process regression -from sklearn.gaussian_process import GaussianProcessRegressor -from sklearn.gaussian_process.kernels import WhiteKernel, RBF stat_file = sys.argv[1] baseline_file = sys.argv[2] From 28be2972e5cac3032452337502a4f31855d6d871 Mon Sep 17 00:00:00 2001 From: satos Date: Thu, 5 Sep 2024 19:20:06 +0900 Subject: [PATCH 4/4] apply manual fix warned by ruff check --- misc/MV_python_integrator.py | 2 +- src/ProbBlackBoxChecking.py | 4 ++-- src/prism_export_to_dot_model.py | 7 +++++-- src/tests/Smc_test.py | 9 ++++++--- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/misc/MV_python_integrator.py b/misc/MV_python_integrator.py index 98a5db7..2bad079 100644 --- a/misc/MV_python_integrator.py +++ b/misc/MV_python_integrator.py @@ -78,7 +78,7 @@ def get_time(self): return self.number_of_steps def eval(self, observation): - if self.current_output == None: + if self.current_output is None: return 0 else: # MDPの出力は次のような文字列 'agree__six__c1_tails__c2_tails' diff --git a/src/ProbBlackBoxChecking.py b/src/ProbBlackBoxChecking.py index d2e3eee..61e5319 100644 --- a/src/ProbBlackBoxChecking.py +++ b/src/ProbBlackBoxChecking.py @@ -380,7 +380,7 @@ def find_cex(self, hypothesis): if not self.only_classical_equivalence_testing: logging.info(f"CEX from SMC: {cex}") - if cex != -1 and cex != None: + if cex != -1 and cex is not None: # 具体的な反例が得られればそれを返す return cex @@ -407,7 +407,7 @@ def find_cex(self, hypothesis): cex = compare_frequency_with_tail( smc.exec_sample, mdp, self.statistical_test_bound ) - if cex != None: + if cex is not None: logging.info(f"CEX from compare_frequency : {cex}") return cex logging.info("Could not find counterexample by compare_frequency.") diff --git a/src/prism_export_to_dot_model.py b/src/prism_export_to_dot_model.py index eb8c6d5..3a03ff2 100644 --- a/src/prism_export_to_dot_model.py +++ b/src/prism_export_to_dot_model.py @@ -28,7 +28,8 @@ def main(): labels = {} name_of_labels = {} - initial_state = None + # XXX: unused + # initial_state = None label_name_line_regex = re.compile('(\d+="\w+" )*(\d+="\w+")') label_name_regex = re.compile('(\d+)="(\w+)"') label_regex = re.compile("(\d+): (.*)") @@ -48,7 +49,9 @@ def main(): for idx in m_index: name = name_of_labels[idx] if name == "init": - initial_state = m[1] + # XXX: unused + # initial_state = m[1] + pass if name not in except_names: label_names.append(name) labels[m[1]] = "__".join(label_names) diff --git a/src/tests/Smc_test.py b/src/tests/Smc_test.py index a67445b..24e4aea 100644 --- a/src/tests/Smc_test.py +++ b/src/tests/Smc_test.py @@ -61,11 +61,14 @@ def test_initialize(self): custom_print(smc.spec_monitor) smc.reset_sut() - ret = smc.one_step() + # XXX: unused + # ret = smc.one_step() smc.step_monitor(smc.current_output) - ret = smc.one_step() + # XXX: unused + # ret = smc.one_step() smc.step_monitor(smc.current_output) - ret = smc.one_step() + # XXX: unused + # ret = smc.one_step() smc.step_monitor(smc.current_output) 0