concurrent actor decisions

main
Nekkowe! 2023-10-01 01:45:10 +02:00
parent 6111f69c48
commit 8798de1a83
2 changed files with 97 additions and 65 deletions

View File

@ -8,12 +8,13 @@ STEPS = 1000
np.random.seed(12345)
fig, ax = plt.subplots()
env = Environment((40, 30))
env = Environment((100, 100))
first_agent = CAMSReverseAndSidestepAgent(
environment=env,
position=(20,15),
initial_direction=Direction.NORTH
position=(50,50),
initial_direction=Direction.NORTH,
required_resources=300
)
im = ax.imshow(env.render(), aspect="equal", origin="lower")

155
sim.py
View File

@ -1,6 +1,7 @@
import numpy as np
from enum import Enum, auto
import matplotlib.pyplot as plt
from dataclasses import dataclass
class Direction(Enum):
NORTH = (0, 1)
@ -8,10 +9,6 @@ class Direction(Enum):
WEST = (-1, 0)
EAST = (1, 0)
class CellState(Enum):
OPEN = auto()
WALL = auto()
class Colours(Enum):
WALL = [30,30,30]
STOCKED = [200,150,0]
@ -19,23 +16,24 @@ class Colours(Enum):
UNDEFINED = [255,0,255]
AGENT = [0,255,0]
class Cell():
def __init__(self, state, resources):
self.state = state
self.resources = resources
@dataclass
class Observations:
obstacles: np.ndarray
resources: np.ndarray
agents: np.ndarray
def has_obstacle(self, position):
return self.obstacles[tuple(position)] == 1
def has_agent(self, position):
return self.agents[tuple(position)] == 1
def get_colour(self):
if self.state is CellState.WALL:
return Colours.WALL
elif self.state is CellState.OPEN:
if self.resources > 0:
return Colours.STOCKED
elif self.resources == 0:
return Colours.DEPLETED
else:
return Colours.UNDEFINED
else:
return Colours.UNDEFINED
def has_resources(self, position):
return self.resources[tuple(position)] > 0
def get_resources(self, position):
return self.resources[tuple(position)]
class Environment:
def __init__(self, shape):
@ -46,41 +44,66 @@ class Environment:
np.random.normal(loc=50, scale=10, size=shape)
)
cols, rows = shape
self.cells = [[] for _ in range(rows)]
for y in range(rows):
for x in range(cols):
if (x == 0 or y == 0 or x+1==cols or y+1==rows):
self.cells[y].append(Cell(CellState.WALL, 0))
else:
self.cells[y].append(Cell(CellState.OPEN, self.resource_map[x,y]))
self.obstacle_map = np.hstack((
np.ones((shape[0], 1)),
np.vstack((
np.ones((1, shape[1]-2)),
np.zeros((shape[0]-2, shape[1]-2)),
np.ones((1, shape[1]-2))
)),
np.ones((shape[0], 1))
))
print(self.obstacle_map)
print(self.obstacle_map.shape)
def step(self):
agent_map = np.zeros(self.shape)
for agent in self.agents:
agent.step()
agent_map[tuple(agent.position)] = 1
observations = Observations(
obstacles=self.obstacle_map,
resources=self.resource_map,
agents=agent_map
)
for agent in self.agents:
agent.step(observations)
def render(self):
cols, rows = self.shape
pixel_data = [[] for _ in range(rows)]
for y in range(self.shape[1]):
for x in range(self.shape[0]):
#print((x, y, self.cell((x,y)).state, self.cell((x,y)).get_colour().value))
pixel_data[y].append(self.cell((x,y)).get_colour().value)
pixel_data = np.zeros(self.shape + (3,))
print(pixel_data.shape)
pixel_data += (
((self.obstacle_map[..., np.newaxis] == 1) * np.array(Colours.WALL.value))
+ ((self.obstacle_map[..., np.newaxis] == 0) * (self.resource_map[..., np.newaxis] == 0) * np.array(Colours.DEPLETED.value))
+ ((self.obstacle_map[..., np.newaxis] == 0) * (self.resource_map[..., np.newaxis] > 0) * np.array(Colours.STOCKED.value))
)
print(pixel_data)
print(pixel_data.shape)
for agent in self.agents:
x, y = agent.position
pixel_data[y][x] = Colours.AGENT.value
pixel_data[tuple(agent.position)] = agent.get_colour()
return np.array(pixel_data)
return np.array(pixel_data).astype(np.uint8).swapaxes(0, 1)
def eat(self, position):
resources = self.resource_map[tuple(position)]
self.resource_map[tuple(position)] = 0
return resources
def cell(self, position):
x, y = tuple(position)
return self.cells[y][x]
def is_wall(self, position):
x, y = tuple(position)
return self.cells[y][x].state == CellState.WALL
def has_obstacle(self, position):
return self.obstacle_map[tuple(position)] == 1
def has_agent(self, position):
return tuple(position) in {tuple(agent.position) for agent in self.agents}
def register_agent(self, agent):
self.agents.append(agent)
@ -94,17 +117,22 @@ class Agent:
self.environment = environment
self.environment.register_agent(self)
def step(self):
def step(self, _observations):
pass
def move(self, direction, respect_walls=True):
def move(self, direction, respect_obstacles=True, respect_agents=True):
new_position = self.position + direction.value
if respect_walls and self.environment.is_wall(new_position):
if respect_obstacles and self.environment.has_obstacle(new_position):
return False
elif respect_agents and self.environment.has_agent(new_position):
return False
else:
self.position = new_position
return True
def get_colour(self):
return Colours.AGENT.value
def die(self):
self.environment.unregister_agent(self)
@ -113,16 +141,14 @@ class DirectionalAgent(Agent):
super().__init__(environment, position)
self.direction = initial_direction
def get_cell_in_front(self):
position_in_front = self.position + self.direction.value
return self.environment.cell(position_in_front)
def get_position_in_front(self):
return self.position + self.direction.value
def wall_in_front(self):
position_in_front = self.position + self.direction.value
return self.environment.is_wall(position_in_front)
def obstacle_in_front(self, observations):
return observations.has_obstacle(self.get_position_in_front())
def agent_in_front(self):
return False
def agent_in_front(self, observations):
return observations.has_agent(self.get_position_in_front())
def move_forward(self):
return self.move(self.direction)
@ -173,12 +199,12 @@ class CAMSReverseAndSidestepAgent(DirectionalAgent):
self.required_resources = required_resources
self.number_of_turns = 0
def step(self):
def step(self, observations):
self.eat()
if self.wall_in_front():
# self.die()
#elif self.agent_in_front():
if self.obstacle_in_front(observations):
self.die()
elif self.agent_in_front(observations):
if self.number_of_turns == 0:
self.reverse_direction()
self.move_forward()
@ -187,12 +213,12 @@ class CAMSReverseAndSidestepAgent(DirectionalAgent):
self.turn_right()
self.move_forward()
self.turn_right()
self.number_of_turns == 0
self.number_of_turns = 0
elif self.resources >= self.required_resources:
self.resources -= self.required_resources
new_agent = CAMSReverseAndSidestepAgent(
environment=self.environment,
position=(self.position + self.direction.value),
position=self.position,
initial_direction=self.direction,
required_resources=self.required_resources
)
@ -201,15 +227,20 @@ class CAMSReverseAndSidestepAgent(DirectionalAgent):
self.move_forward()
def eat(self):
cell = self.environment.cell(self.position)
resources = cell.resources
resources = self.environment.eat(self.position)
if resources > 0:
self.resources += resources
cell.resources = 0
return True
else:
return False
def get_colour(self):
if self.number_of_turns == 0:
return [0, 255, 0]
elif self.number_of_turns == 1:
return [255, 0, 0]
else:
return Colours.UNDEFINED