PK2'OQB""snakipy/__init__.py"""Snake""" __version__ = "0.0.2" PKK&O pV$V$snakipy/main.pyimport curses from enum import Enum import logging import random import sys import numpy as np from .snake import Direction logger = logging.getLogger(__name__) curses_colors = ( curses.COLOR_WHITE, curses.COLOR_CYAN, curses.COLOR_BLUE, curses.COLOR_GREEN, curses.COLOR_YELLOW, curses.COLOR_MAGENTA, curses.COLOR_RED, curses.COLOR_RED, curses.COLOR_RED, curses.COLOR_RED, curses.COLOR_RED, ) FRUIT_REWARD = 10 DEATH_REWARD = -50 DISTANCE_REWARD = 2 class Game: """ Contains and manages the game state """ def __init__( self, width, height, *, snakes=None, player_snake=None, max_number_of_fruits=1, max_number_of_snakes=1, log=None, view_size=3, border=False, seed=None ): self.fruits = [] if snakes is None and player_snake is None: raise ValueError("There are no snakes!") if snakes is None: snakes = [] self.snakes = snakes self.player_snake = player_snake if player_snake: self.snakes.append(player_snake) self.width, self.height = width, height self.log = log self.view_size = view_size self.border = border self.max_number_of_fruits = max_number_of_fruits self.max_number_of_snakes = max_number_of_snakes self.rewards = [0 for s in snakes] self.closest_distance = [None for s in snakes] self.rng = np.random.RandomState(seed) self.update_fruits() def __iter__(self): game_over = False while True: direction = yield if self.player_snake: self.punish_circles(self.player_snake, direction) self.player_snake.update(direction) for snake in self.snakes: if snake is not self.player_snake: snake.update(None) self.check_collisions() if not self.snakes: game_over = True self.update_fruits() self.update_distances() if game_over: break def punish_circles(self, snake, new_direction): dir_list = list(Direction) dir_idx = dir_list.index(snake.direction) snake_idx = self.snakes.index(snake) i1 = (dir_idx + 1) % 4 i2 = (dir_idx - 1) % 4 if dir_list[i1] == new_direction or dir_list[i2] == new_direction: self.rewards[snake_idx] -= 1 def update_fruits(self): """Add fruits to the game until max_number_of_fruits is reached.""" while len(self.fruits) < self.max_number_of_fruits: new_x, new_y = ( self.rng.randint(0, self.width - 1), self.rng.randint(0, self.height - 1), ) self.fruits.append((new_x, new_y)) def update_distances(self): new_distances = self.determine_fruit_distances() for idx, (old_dist, new_dist) in enumerate( zip(self.closest_distance, new_distances) ): if old_dist is None: self.closest_distance[idx] = new_dist continue if new_dist < old_dist: self.rewards[idx] += DISTANCE_REWARD elif new_dist > old_dist: self.rewards[idx] -= DISTANCE_REWARD + 1 self.closest_distance[idx] = new_dist def determine_fruit_distances(self): return [ min([self.fruit_distance(snake, fruit) for fruit in self.fruits]) for snake in self.snakes ] @staticmethod def fruit_distance(snake, fruit): x, y = snake.coordinates[-1] xf, yf = fruit return abs(x - xf) + abs(y - yf) def check_collisions(self): fruits_to_be_deleted = [] snakes_to_be_deleted = [] for s_idx, s in enumerate(self.snakes): x_s, y_s = s.coordinates[-1] if self.border: if any((x_s < 0, x_s >= self.width, y_s < 0, y_s >= self.height)): snakes_to_be_deleted.append(s) self.rewards[s_idx] += DEATH_REWARD continue else: x_s %= self.width y_s %= self.height s.coordinates[-1] = x_s, y_s # Check fruit collision for fruit in self.fruits: if (x_s, y_s) == fruit: s.length += 2 fruits_to_be_deleted.append(fruit) self.rewards[s_idx] += FRUIT_REWARD logger.debug("Snake %s got a fruit", s_idx) # Check snake collisions for s2_idx, s2 in enumerate(self.snakes): if s_idx != s2_idx: for x2s, y2s in s2.coordinates: if (x_s, y_s) == (x2s, y2s): snakes_to_be_deleted.append(s) else: for x2s, y2s in list(s2.coordinates)[:-1]: if (x_s, y_s) == (x2s, y2s): snakes_to_be_deleted.append(s) self.rewards[s_idx] += DEATH_REWARD for tbd in fruits_to_be_deleted: self.fruits.remove(tbd) for snk in snakes_to_be_deleted: self.snakes.remove(snk) @property def state_array(self): """ Return array of current state. The game board is encoded as follows: Snake body: 1 Fruit : 2 """ state = np.zeros((self.width, self.height, 2), float) for snake in self.snakes: for x, y in snake.coordinates: state[x, y, 0] = 1 for x, y in self.fruits: state[x, y, 1] = 1 return state def get_surrounding_view(self, snake, onehot=False): vs = self.view_size idx = self.snakes.index(snake) arr = self.state_array x, y = self.snakes[idx].coordinates[-1] view = np.roll(arr, (arr.shape[0] // 2 - x, arr.shape[1] // 2 - y), axis=(0, 1)) view = view[ view.shape[0] // 2 - vs + 1 : view.shape[0] // 2 + vs, view.shape[1] // 2 - vs + 1 : view.shape[1] // 2 + vs, ].T if onehot: vec = np.zeros((*view.shape, 2), int) nonzero = view > 0 vec[nonzero, view[nonzero] - 1] = 1 return vec return view def coordinate_occupied(self, coord): if coord in self.fruits: return 1 if any(coord in snake.coordinates for snake in self.snakes): return 2 def is_wall_or_snake(self, coord): if self.border: if coord[0] in (-1, self.width) or coord[1] in (-1, self.height): return True for snake in self.snakes: if coord in snake.coordinates: return True return False def fruit_ahead(self, coord, direction): head_x, head_y = coord # look north if direction == Direction.NORTH: for y in reversed(range(head_y)): if (head_x, y) in self.fruits: return True # look east if direction == Direction.EAST: for x in range(head_x + 1, self.width): if (x, head_y) in self.fruits: return True # look south if direction == Direction.SOUTH: for y in range(head_y + 1, self.height): if (head_x, y) in self.fruits: return True # look west if direction == Direction.WEST: for x in reversed(range(head_x)): if (x, head_y) in self.fruits: return True return False def reduced_coordinates(self, snake): """ Returns an array of length three. If the first entry is one, there is a fruit left to the snake. If the second entry is one, there is a fruit ahead of the snake. If the third entry is one, there is a fruit right of the snake. Parameters ---------- snake : Snake """ head_x, head_y = snake.coordinates[-1] direction = snake.direction result = np.zeros((4, 2)) # look north if self.is_wall_or_snake((head_x, head_y - 1)): result[0, 1] = 1 if self.fruit_ahead((head_x, head_y), Direction.NORTH): result[0, 0] = 1 # look east if self.is_wall_or_snake((head_x + 1, head_y)): result[1, 1] = 1 if self.fruit_ahead((head_x, head_y), Direction.EAST): result[1, 0] = 1 # look south if self.is_wall_or_snake((head_x, head_y + 1)): result[2, 1] = 1 if self.fruit_ahead((head_x, head_y), Direction.SOUTH): result[2, 0] = 1 # look west if self.is_wall_or_snake((head_x - 1, head_y)): result[3, 1] = 1 if self.fruit_ahead((head_x, head_y), Direction.WEST): result[3, 0] = 1 direction_idx = direction.value result = np.roll(result, Direction.EAST.value - direction_idx, axis=0) return result[:3] PK'Nczesnakipy/neuro.pyimport pickle import numpy as np def cross_entropy(y, y_net): n = y.shape[0] return -1 / n * (y * np.log(y_net) + (1 - y) * np.log(1 - y_net)).sum(axis=0) def sigmoid(x): ex = np.exp(x) return ex / (1 + ex) class NeuralNet: def __init__(self, in_size, hl_size, out_size, dna=None): if dna is not None: self.dna = dna else: size = (in_size + 1) * hl_size + (hl_size + 1) * out_size self.dna = np.random.randn(size) self.W1 = self.dna[: (in_size + 1) * hl_size].reshape((in_size + 1, hl_size)) self.W2 = self.dna[(in_size + 1) * hl_size :].reshape((hl_size + 1, out_size)) if dna is None: self.W1 /= np.sqrt(self.W1.shape[0]) self.W2 /= np.sqrt(self.W2.shape[0]) def forward(self, x1): x2 = np.tanh(x1 @ self.W1[:-1] + self.W1[-1]) x3 = x2 @ self.W2[:-1] + self.W2[-1] softmax_x3 = np.exp(x3 - x3.max(axis=-1, keepdims=True)) softmax_x3 /= softmax_x3.sum(axis=-1, keepdims=True) return softmax_x3 def decide(self, x1): return np.argmax(self.forward(x1)) def __getstate__(self): return {"W1": self.W1, "W2": self.W2} def __setstate__(self, state): self.__dict__.update(state) PKH&O"lJnH H snakipy/snake.pyfrom collections import deque from enum import Enum import random import logging import numpy as np from .neuro import NeuralNet logger = logging.getLogger(__name__) Direction = Enum("Direction", "NORTH EAST SOUTH WEST") class Snake: def __init__(self, x, y, max_x, max_y, direction): self.coordinates = deque([(x, y)]) self.max_x, self.max_y = max_x, max_y self.direction = direction self.length = 1 def __repr__(self): x, y = self.coordinates[-1] return f"Snake({x}, {y})" @classmethod def random_init(cls, width, height): start_direction = random.choice(list(Direction)) x, y = random.randint(1, width - 1), random.randint(1, height - 1) return cls(x, y, width, height, start_direction) def update(self, direction): if direction: new_direction = direction else: new_direction = self.direction head_x, head_y = self.coordinates[-1] # Do not allow 180° turnaround if (new_direction, self.direction) in [ (Direction.NORTH, Direction.SOUTH), (Direction.SOUTH, Direction.NORTH), (Direction.EAST, Direction.WEST), (Direction.WEST, Direction.EAST), ]: new_direction = self.direction if new_direction == Direction.NORTH: new_x, new_y = head_x, head_y - 1 elif new_direction == Direction.EAST: new_x, new_y = head_x + 1, head_y elif new_direction == Direction.SOUTH: new_x, new_y = head_x, head_y + 1 else: new_x, new_y = head_x - 1, head_y self.direction = new_direction self.coordinates.append((new_x, new_y)) if len(self.coordinates) > self.length: self.coordinates.popleft() class NeuroSnake(Snake): def __init__( self, x, y, max_x, max_y, input_size, hidden_size, direction=None, dna=None ): super().__init__(x, y, max_x, max_y, direction=direction) self.net = NeuralNet(input_size, hidden_size, 3, dna=dna) self.net_output = None def decide_direction(self, view): dirs = list(Direction) if self.direction is None: self.direction = random.choice(dirs) return self.direction self.net_output = self.net.forward(view) dir_idx = self.direction.value - 1 idx = np.argmax(self.net_output) - 1 new_dir = dirs[(dir_idx + idx) % 4] logger.debug("Old direction: %s", self.direction) logger.debug("New direction: %s", new_dir) return new_dir PKx'O']nbb snakipy/ui.py"""Classes for rendering the Snake game""" import curses import logging import multiprocessing import fire import numpy as np from scipy.optimize import minimize from tqdm import trange from abc_algorithm.main import Swarm from .main import Game from .snake import Direction, Snake, NeuroSnake logger = logging.getLogger(__name__) curses_colors = ( curses.COLOR_WHITE, curses.COLOR_CYAN, curses.COLOR_BLUE, curses.COLOR_GREEN, curses.COLOR_YELLOW, curses.COLOR_MAGENTA, curses.COLOR_RED, curses.COLOR_RED, curses.COLOR_RED, curses.COLOR_RED, curses.COLOR_RED, ) class UI: def __init__(self, game: Game, **kwargs): self.game = game def draw_fruits(self, screen): for x, y in self.game.fruits: screen.addstr(y, x, "O", curses.color_pair(6)) def draw_snake(self, screen, snake): for x, y in snake.coordinates: screen.addstr(y, x, "X", curses.color_pair(3)) def draw(self, screen): self.draw_fruits(screen) for snake in self.game.snakes: self.draw_snake(screen, snake) def run(self): pass class Curses(UI): def __init__( self, game, *, debug=False, robot=False, generate_data=False, sleep=70 ): super().__init__(game) self.debug = debug self.robot = robot self.generate_data = generate_data self.sleep = sleep def check_input(self, screen): inp = screen.getch() if inp == curses.KEY_UP: direction = Direction.NORTH elif inp == curses.KEY_DOWN: direction = Direction.SOUTH elif inp == curses.KEY_LEFT: direction = Direction.WEST elif inp == curses.KEY_RIGHT: direction = Direction.EAST else: direction = None return direction def run(self): curses.wrapper(self._loop) def debug_msg(self, screen, msg): screen.addstr(0, 0, msg) def _loop(self, screen): y, x = screen.getmaxyx() assert ( self.game.width <= x and self.game.height <= y ), f"Wrong game dimensions {self.game.width}, {self.game.height} != {x}, {y}!" y -= 1 game = self.game player_snake = self.game.player_snake curses.curs_set(0) screen.nodelay(True) for i in range(1, 11): curses.init_pair(i, curses_colors[i], curses.COLOR_BLACK) game_it = iter(game) direction = None while True: screen.clear() # coords = self.game.reduced_coordinates(player_snake).flatten() coords = self.game.state_array.flatten() if self.debug: # arr = self.game.reduced_coordinates(player_snake) self.debug_msg( screen, str( [ coords, player_snake.net_output, game.rewards, player_snake.direction, ] ), ) self.draw(screen) screen.refresh() curses.napms(self.sleep) game_it.send(direction) player_input = self.check_input(screen) if player_input is None and self.robot: direction = player_snake.decide_direction(coords) else: direction = player_input if self.generate_data: pass class LogPositions(UI): def run(self): for _ in self.game: for i, snake in enumerate(self.game.snakes): print(f"{i}) {snake} (reward: {self.game.rewards}") class LogStates(UI): def run(self): for _ in self.game: print(self.game.state_array) class ParameterSearch: def __init__( self, game_options, snake_options, max_steps=10_000, n_average=10, dna=None ): self.game_options = game_options self.snake_options = snake_options self.max_steps = max_steps self.n_average = n_average self.dna = dna def benchmark(self, dna): score = 0 for _ in range(self.n_average): game = Game( **self.game_options, player_snake=NeuroSnake(**self.snake_options, dna=dna), ) score += self.run(game) return -score / self.n_average def run(self, game): game_it = iter(game) direction = None player_snake = game.player_snake for step in range(self.max_steps): try: game_it.send(direction) except StopIteration: break direction = player_snake.decide_direction(game.state_array.flatten()) logger.debug("Stopped after %s steps", step) return game.rewards[0] def _get_screen_size(screen): y, x = screen.getmaxyx() return x, y def get_screen_size(): print(curses.wrapper(_get_screen_size)) def main( debug=False, robot=False, dna_file=None, width=None, n_fruits=30, hidden_size=10, sleep=70, border=False, ): logging.basicConfig(level=logging.DEBUG) x, y = curses.wrapper(_get_screen_size) if width: x = width y = width if dna_file: dna = np.load(dna_file) else: dna = None input_size = 6 game = Game( x, y, player_snake=NeuroSnake( x // 2, y // 2, max_x=x, max_y=y, input_size=input_size, hidden_size=hidden_size, dna=dna, direction=Direction.SOUTH, ), max_number_of_fruits=n_fruits, border=border, ) ui = Curses(game, debug=debug, robot=robot, sleep=sleep) try: ui.run() except StopIteration: print("Game Over") print("Score:", *game.rewards) def training( n_optimize=100, hidden_size=10, max_steps=100, search_radius=1, log_level="info", n_employed=20, n_onlooker=20, n_fruits=10, n_average=10, border=False, dna_file=None, width=20, height=None, seed=None, ): logging.basicConfig(level=getattr(logging, log_level.upper())) x = width y = height if height else x input_size = x * y * 2 # Reduce y-size by one to avoid curses scroll problems game_options = { "width": x, "height": y, "max_number_of_fruits": n_fruits, "border": border, "seed": seed, } snake_options = { "x": x // 2, "y": y // 2, "max_x": x, "max_y": y, "input_size": input_size, "hidden_size": hidden_size, "direction": Direction.SOUTH, } if dna_file: try: dna = np.load(dna_file) except FileNotFoundError: logger.error("File not found") dna = None else: dna = np.random.normal( size=(input_size + 1) * hidden_size + (hidden_size + 1) * 3, loc=0, scale=1.0, ) ui = ParameterSearch( game_options, snake_options, max_steps=max_steps, n_average=n_average, dna=dna ) swarm = Swarm( ui.benchmark, (input_size + 1) * hidden_size + (hidden_size + 1) * 3, n_employed=n_employed, n_onlooker=n_onlooker, limit=10, max_cycles=n_optimize, lower_bound=-1, upper_bound=1, search_radius=search_radius, ) for result in swarm.run(): logger.info("Saving to %s", dna_file) np.save(dna_file, result) def entrypoint(): fire.Fire() PK!HM./(snakipy-0.0.2.dist-info/entry_points.txtN+I/N.,()*KNzVy%Ey%\\PKNV..snakipy-0.0.2.dist-info/LICENSEMIT License Copyright (c) 2016 Gabriel Kabbe Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HMuSasnakipy-0.0.2.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UD"PK!Hfn snakipy-0.0.2.dist-info/METADATA]j0 ໟBǍ4ehnǡ$jjjǩY)%7$I|utŔ^>7AjtϢPK!H[snakipy-0.0.2.dist-info/RECORDuλ@|hK0"V$ ݈2om:kbt]}vLSRvd:|sKﺜ`^wĮN.} [c0~F?D%aL]U% /mF2c=w#MD(N.U"1mCj@װ.0a^֊ { cD-tcOcE r+{E|o^ yQ)WhF=s2֍M