266 lines
8.2 KiB
Python
266 lines
8.2 KiB
Python
import numpy as np
|
|
from enum import Enum
|
|
from dataclasses import dataclass
|
|
|
|
class Direction(Enum):
|
|
NORTH = (0, 1)
|
|
SOUTH = (0, -1)
|
|
WEST = (-1, 0)
|
|
EAST = (1, 0)
|
|
|
|
class Colours(Enum):
|
|
WALL = [30, 30, 30]
|
|
STOCKED = [240, 170, 0]
|
|
DEPLETED = [77, 61, 41]
|
|
UNDEFINED = [255, 0, 255]
|
|
AGENT = [0, 255, 0]
|
|
|
|
@dataclass
|
|
class Observations:
|
|
obstacles: np.ndarray
|
|
resources: np.ndarray
|
|
agents: np.ndarray
|
|
|
|
def has_obstacle(self, position):
|
|
return self.obstacles[tuple(position)] == 1
|
|
|
|
def has_agent(self, position):
|
|
return self.agents[tuple(position)] == 1
|
|
|
|
def has_resources(self, position):
|
|
return self.resources[tuple(position)] > 0
|
|
|
|
def get_resources(self, position):
|
|
return self.resources[tuple(position)]
|
|
|
|
|
|
class Environment:
|
|
def __init__(self, shape):
|
|
self.shape = shape
|
|
self.agents = []
|
|
|
|
self.resource_map = np.round(
|
|
np.random.normal(loc=50, scale=10, size=shape)
|
|
)
|
|
|
|
self.obstacle_map = np.hstack((
|
|
np.ones((shape[0], 1)),
|
|
np.vstack((
|
|
np.ones((1, shape[1]-2)),
|
|
np.zeros((shape[0]-2, shape[1]-2)),
|
|
np.ones((1, shape[1]-2))
|
|
)),
|
|
np.ones((shape[0], 1))
|
|
))
|
|
|
|
def step(self):
|
|
agent_map = np.zeros(self.shape)
|
|
for agent in self.agents:
|
|
agent_map[tuple(agent.position)] = 1
|
|
|
|
observations = Observations(
|
|
obstacles=self.obstacle_map,
|
|
resources=self.resource_map,
|
|
agents=agent_map
|
|
)
|
|
|
|
for agent in self.agents:
|
|
agent.step(observations)
|
|
|
|
def render(self):
|
|
pixel_data = np.zeros(self.shape + (3,))
|
|
|
|
pixel_data += (
|
|
(
|
|
(self.obstacle_map[..., np.newaxis] == 1) # Flag: wall
|
|
* np.array(Colours.WALL.value) # Colour
|
|
)
|
|
+ (
|
|
(self.obstacle_map[..., np.newaxis] == 0) # Flag: no wall
|
|
* (self.resource_map[..., np.newaxis] == 0) # Flag: depleted
|
|
* np.array(Colours.DEPLETED.value) # Colour
|
|
)
|
|
+ (
|
|
(
|
|
# Set of flags for applying the "cell is stocked" colour component
|
|
(self.obstacle_map[..., np.newaxis] == 0) # Flag: no wall
|
|
* (self.resource_map[..., np.newaxis] > 0) # Flag: not depleted
|
|
) * (
|
|
# Remapping of resource values to visualize relative cell resources
|
|
self.resource_map[..., np.newaxis] # Resource values
|
|
/ 100 # Remap of resource values to (vaguely) 0-1 float range
|
|
* 0.4 # Remap of 0-1 float range to 0-0.2 float range
|
|
+ 0.6 # Raising baseline of 0-0.2 float range to 0.8-1 float range
|
|
)
|
|
* np.array(Colours.STOCKED.value) # Colour
|
|
)
|
|
)
|
|
|
|
for agent in self.agents:
|
|
pixel_data[tuple(agent.position)] = agent.get_colour()
|
|
|
|
return np.array(pixel_data).astype(np.uint8).swapaxes(0, 1)
|
|
|
|
def eat(self, position):
|
|
resources = self.get_resources(position)
|
|
self.set_resources(position, 0)
|
|
return resources
|
|
|
|
def has_obstacle(self, position):
|
|
return self.obstacle_map[tuple(position)] == 1
|
|
|
|
def has_agent(self, position):
|
|
return tuple(position) in {tuple(agent.position) for agent in self.agents}
|
|
|
|
def get_resources(self, position):
|
|
return self.resource_map[tuple(position)]
|
|
|
|
def set_resources(self, position, resources):
|
|
self.resource_map[tuple(position)] = 0
|
|
|
|
def register_agent(self, agent):
|
|
self.agents.append(agent)
|
|
|
|
def unregister_agent(self, agent):
|
|
self.agents.remove(agent)
|
|
|
|
class Agent:
|
|
def __init__(self, environment, position):
|
|
self.position = np.array(position)
|
|
self.environment = environment
|
|
self.environment.register_agent(self)
|
|
|
|
def step(self, _observations):
|
|
pass
|
|
|
|
def move(self, direction, respect_obstacles=True, respect_agents=True):
|
|
new_position = self.position + direction.value
|
|
if respect_obstacles and self.environment.has_obstacle(new_position):
|
|
return False
|
|
elif respect_agents and self.environment.has_agent(new_position):
|
|
return False
|
|
else:
|
|
self.position = new_position
|
|
return True
|
|
|
|
def get_colour(self):
|
|
return Colours.AGENT.value
|
|
|
|
def die(self):
|
|
self.environment.unregister_agent(self)
|
|
|
|
class DirectionalAgent(Agent):
|
|
def __init__(self, environment, position, initial_direction):
|
|
super().__init__(environment, position)
|
|
self.direction = initial_direction
|
|
|
|
def get_position_in_front(self):
|
|
return self.position + self.direction.value
|
|
|
|
def obstacle_in_front(self, observations):
|
|
return observations.has_obstacle(self.get_position_in_front())
|
|
|
|
def agent_in_front(self, observations):
|
|
return observations.has_agent(self.get_position_in_front())
|
|
|
|
def move_forward(self):
|
|
return self.move(self.direction)
|
|
|
|
NORTH = (0, 1)
|
|
SOUTH = (0, -1)
|
|
WEST = (-1, 0)
|
|
EAST = (1, 0)
|
|
|
|
def turn_left(self):
|
|
if self.direction is Direction.NORTH:
|
|
self.direction = Direction.WEST
|
|
elif self.direction is Direction.EAST:
|
|
self.direction = Direction.NORTH
|
|
elif self.direction is Direction.SOUTH:
|
|
self.direction = Direction.EAST
|
|
elif self.direction is Direction.WEST:
|
|
self.direction = Direction.SOUTH
|
|
else:
|
|
raise ValueError
|
|
|
|
def turn_right(self):
|
|
if self.direction is Direction.NORTH:
|
|
self.direction = Direction.EAST
|
|
elif self.direction is Direction.EAST:
|
|
self.direction = Direction.SOUTH
|
|
elif self.direction is Direction.SOUTH:
|
|
self.direction = Direction.WEST
|
|
elif self.direction is Direction.WEST:
|
|
self.direction = Direction.NORTH
|
|
else:
|
|
raise ValueError
|
|
|
|
def reverse_direction(self):
|
|
new_vector = np.array(self.direction.value) * -1
|
|
self.direction = Direction(tuple(new_vector))
|
|
|
|
class CAMSReverseAndSidestepAgent(DirectionalAgent):
|
|
def __init__(
|
|
self,
|
|
environment,
|
|
position,
|
|
initial_direction,
|
|
required_resources = 500
|
|
):
|
|
super().__init__(environment, position, initial_direction)
|
|
self.resources = 0
|
|
self.required_resources = required_resources
|
|
self.number_of_turns = 0
|
|
|
|
def step(self, observations):
|
|
self.eat()
|
|
|
|
if self.obstacle_in_front(observations):
|
|
self.die()
|
|
elif self.agent_in_front(observations):
|
|
if self.number_of_turns == 0:
|
|
self.reverse_direction()
|
|
self.move_forward()
|
|
self.number_of_turns += 1
|
|
elif self.number_of_turns > 0:
|
|
self.sidestep_and_reset_counter()
|
|
elif self.resources >= self.required_resources:
|
|
self.resources -= self.required_resources
|
|
new_agent = CAMSReverseAndSidestepAgent(
|
|
environment=self.environment,
|
|
position=self.position,
|
|
initial_direction=self.direction,
|
|
required_resources=self.required_resources
|
|
)
|
|
if self.number_of_turns == 0:
|
|
self.reverse_direction()
|
|
self.number_of_turns += 1
|
|
elif self.number_of_turns > 0:
|
|
self.sidestep_and_reset_counter()
|
|
else:
|
|
self.move_forward()
|
|
|
|
def sidestep_and_reset_counter(self):
|
|
self.turn_right()
|
|
self.move_forward()
|
|
self.turn_right()
|
|
self.number_of_turns = 0
|
|
|
|
|
|
def eat(self):
|
|
resources = self.environment.eat(self.position)
|
|
|
|
if resources > 0:
|
|
self.resources += resources
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_colour(self):
|
|
if self.number_of_turns == 0:
|
|
return [0, 255, 0]
|
|
elif self.number_of_turns == 1:
|
|
return [255, 0, 0]
|
|
else:
|
|
return Colours.UNDEFINED
|