tfwr/mazelib.py

456 lines
12 KiB
Python

from __builtins__ import *
from __utils__ import *
import drone
import farm
DIRS = {
North: (0, 1),
South: (0, -1),
East: (1, 0),
West: (-1, 0),
}
KNOWN = {}
def plant_maze(size: int):
substance_needed = size * 2 ** (num_unlocked(Unlocks.Mazes) - 1)
drone.harvest_except(Entities.Bush)
plant(Entities.Bush)
use_item(Items.Weird_Substance, substance_needed)
def wipe_known():
global KNOWN
KNOWN = {}
def do_maze(size: int, repeat: int, solve_fun=None, wipe_every: int = None):
if solve_fun == None:
solve_fun = astar_solver
loops = 0
wipe_known()
quick_print("do_maze size: ", size)
plant_maze(size)
if get_entity_type() == Entities.Treasure:
quick_print("Standing on treasure at start, harvesting")
before_treasure = num_items(Items.Gold)
harvest()
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
return
while True:
success, time = solve_fun()
if success:
loops += 1
seconds = time / 400 / (num_unlocked(Unlocks.Speed) + 1)
print("Solved in: ", seconds, " seconds. Loop: ", loops)
if repeat:
if loops >= repeat:
quick_print("Too many loops, cashing in")
before_treasure = num_items(Items.Gold)
harvest()
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
return
substance_needed = size * 2 ** (num_unlocked(Unlocks.Mazes) - 1)
if substance_needed > num_items(Items.Weird_Substance):
quick_print("Not enough substance to repeat maze")
before_treasure = num_items(Items.Gold)
harvest()
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
return
if loops % wipe_every == 0:
wipe_known()
use_item(Items.Weird_Substance, substance_needed)
else:
before_treasure = num_items(Items.Gold)
harvest()
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
return
else:
break
def at_exit():
# Seems silly, but lets me put a breakpoint on the win condition
if get_entity_type() == Entities.Treasure:
return True
else:
return False
def ensure_cell(x, y):
if (x, y) not in KNOWN:
KNOWN[(x, y)] = {'openings': set(), 'probed': False, 'neighbours': None}
return KNOWN[(x, y)]
def record_passage(x, y, d):
dx, dy = DIRS[d]
nx, ny = x + dx, y + dy
a = ensure_cell(x, y)
b = ensure_cell(nx, ny)
a['openings'].add(d)
b['openings'].add(opposite(d))
a['neighbours'] = None
b['neighbours'] = None
def probe_all(x, y):
cell = ensure_cell(x, y)
if cell['probed']:
return
for d in DIRS:
if can_move(d):
record_passage(x, y, d)
cell['probed'] = True
def neighbours(x, y):
cell = KNOWN[(x, y)]
if cell['neighbours'] != None:
return cell['neighbours']
result = []
for d in cell['openings']:
dx, dy = DIRS[d]
result.append((d, (x + dx, y + dy)))
cell['neighbours'] = result
return result
def all_cells():
return KNOWN
def frontier_cells():
tick_start = get_tick_count()
result = []
for pos in KNOWN:
cell = KNOWN[pos]
if not cell['probed']:
result.append(pos)
quick_print("frontier entry: ", tick_start, " exit: ", get_tick_count())
return result
def manhattan(a, b):
return abs(a[0] - b[0]) + abs(a[1] - b[1])
def a_star(start, goal):
tick_start = get_tick_count()
if start == goal:
return []
open_set = {start: manhattan(start, goal)} # pos -> f_score
came_from = {} # pos -> (prev_pos, direction)
g_score = {start: 0}
closed = set()
while open_set:
# Find pos in open_set with lowest f
current = None
current_f = None
for pos in open_set:
f = open_set[pos]
if current_f == None or f < current_f:
current = pos
current_f = f
if current == goal:
path = reconstruct_path(came_from, current)
quick_print("a* entry: ", tick_start, " exit: ", get_tick_count())
return path
open_set.pop(current)
closed.add(current)
for entry in neighbours(current[0], current[1]):
d = entry[0]
neighbour = entry[1]
if neighbour in closed:
continue
tentative_g = g_score[current] + 1
if neighbour in g_score:
best_known = g_score[neighbour]
else:
best_known = None
if best_known == None or tentative_g < best_known:
came_from[neighbour] = (current, d)
g_score[neighbour] = tentative_g
open_set[neighbour] = tentative_g + manhattan(neighbour, goal)
return None
def reconstruct_path(came_from, end):
tick_start = get_tick_count()
path = []
current = end
while current in came_from:
entry = came_from[current]
prev = entry[0]
d = entry[1]
path.append(d)
current = prev
quick_print("reconstruct_path entry: ", tick_start, " exit: ", get_tick_count())
return path # backwards: path[-1] is first step, path[0] is last step
def astar_solver() -> tuple[bool, int]:
tick_start = get_tick_count()
cursor = drone.get_loc()
exit_pos = measure()
quick_print("explore_to_exit start: ", cursor, " exit: ", exit_pos)
ensure_cell(cursor[0], cursor[1])
probe_all(cursor[0], cursor[1])
steps = 0
while not at_exit():
# Try to plan straight to the exit through known graph
if exit_pos in KNOWN:
path = a_star(cursor, exit_pos)
else:
path = None
if path == None:
# Exit not reachable yet — head to nearest unprobed frontier instead
target = pick_frontier(cursor, exit_pos)
quick_print("No path to exit; frontier target: ", target, " known cells: ", len(KNOWN))
if target == None:
quick_print("No frontier left, giving up")
return False, 0 # fully explored, no exit found — shouldn't happen if exit_pos is valid
path = a_star(cursor, target)
if path == None:
quick_print("Frontier ", target, " unreachable from ", cursor)
return False, 0 # frontier somehow unreachable — bug
# Walk the plan. path[-1] is first step, path[0] is last step.
# Stop early only if we step into an unprobed cell — that may reveal a shortcut.
i = len(path) - 1
while i >= 0 and not at_exit():
d = path[i]
move(d)
steps = steps + 1
cursor = drone.get_loc()
pos = (cursor[0], cursor[1])
if pos in KNOWN:
was_new = not KNOWN[pos]['probed']
else:
was_new = True
probe_all(cursor[0], cursor[1])
quick_print("step ", steps, " moved ", d, " to ", cursor, " plan_len: ", len(path), " new: ", was_new)
if was_new:
break # re-plan in case probing revealed a shortcut
i = i - 1
quick_print("Reached exit at ", cursor, " in ", steps, " steps")
quick_print("explore_to_exit entry: ", tick_start, " exit: ", get_tick_count())
return True, get_tick_count() - tick_start
def pick_frontier(cursor, exit_pos):
best = None
best_score = None
for pos in frontier_cells():
score = manhattan(cursor, pos) + manhattan(pos, exit_pos)
if best_score == None or score < best_score:
best_score = score
best = pos
return best
LEFT_OF = {North: West, West: South, South: East, East: North}
RIGHT_OF = {North: East, East: South, South: West, West: North}
def left_hand_solve()-> tuple[bool, int]:
tick_start = get_tick_count()
facing = North # arbitrary starting orientation
while not at_exit():
left = LEFT_OF[facing]
right = RIGHT_OF[facing]
back = opposite(facing)
if can_move(left):
facing = left
elif can_move(facing):
pass # keep facing
elif can_move(right):
facing = right
else:
facing = back # dead end, turn around
move(facing)
return True, get_tick_count() - tick_start
def astar_solver_parallel():
tick_start = get_tick_count()
cursor = drone.get_loc()
exit_pos = measure()
ensure_cell(cursor[0], cursor[1])
probe_all(cursor[0], cursor[1])
steps = 0
step_budget = 20 # how far drones explore beyond their frontier
while not at_exit():
# Try to plan straight to the exit
path = None
if exit_pos in KNOWN:
path = a_star(cursor, exit_pos)
if path != None:
# Exploit known route
i = len(path) - 1
while i >= 0 and not at_exit():
d = path[i]
move(d)
steps = steps + 1
cursor = drone.get_loc()
pos = (cursor[0], cursor[1])
if pos in KNOWN:
was_new = not KNOWN[pos]['probed']
else:
was_new = True
probe_all(cursor[0], cursor[1])
if was_new:
break
i = i - 1
continue
# No path to exit — need to expand frontier in parallel
frontiers = frontier_cells()
if len(frontiers) == 0:
return False, 0
# Rank frontiers by promise: closer to exit = better
ranked = []
for f in frontiers:
score = manhattan(f, exit_pos)
ranked.append((score, f))
# Simple insertion sort (no built-in sort assumed)
i = 1
while i < len(ranked):
j = i
while j > 0 and ranked[j][0] < ranked[j - 1][0]:
tmp = ranked[j]
ranked[j] = ranked[j - 1]
ranked[j - 1] = tmp
j = j - 1
i = i + 1
# Spawn drones on the most promising frontiers; main drone takes one too
drones = []
main_target = None
i = 0
while i < len(ranked):
target = ranked[i][1]
maybe = spawn_drone(explore_frontier, target, step_budget)
if maybe:
drones.append(maybe)
else:
if main_target == None:
main_target = target
else:
break # out of slots
i = i + 1
# Main drone does its own frontier work (or merge-only if all slots filled)
if main_target != None:
my_discoveries = explore_frontier(main_target, step_budget)
merge_discoveries(my_discoveries)
# Collect parallel drones
while len(drones) > 0:
thread = drones.pop(0)
discoveries = wait_for(thread)
merge_discoveries(discoveries)
cursor = drone.get_loc()
return True, get_tick_count() - tick_start
def merge_discoveries(discoveries):
for entry in discoveries:
pos = entry[0]
openings = entry[1]
cell = ensure_cell(pos[0], pos[1])
for d in openings:
if d not in cell['openings']:
# Re-establish the passage in our graph
dx, dy = DIRS[d]
neighbour_pos = (pos[0] + dx, pos[1] + dy)
ensure_cell(neighbour_pos[0], neighbour_pos[1])
cell['openings'].add(d)
neighbour_cell = KNOWN[neighbour_pos]
neighbour_cell['openings'].add(opposite(d))
cell['neighbours'] = None
neighbour_cell['neighbours'] = None
cell['probed'] = True
def explore_frontier(target, step_budget):
# Drone has its own copy of KNOWN from spawn
cursor = drone.get_loc()
pos = (cursor[0], cursor[1])
discoveries = []
# Make sure spawn position is in our local map
ensure_cell(pos[0], pos[1])
# A* to the target using local KNOWN
path = a_star(pos, target)
if path == None:
# Shouldn't happen — frontier is by definition in KNOWN
return discoveries
# Walk the path. path[-1] is first step.
i = len(path) - 1
while i >= 0:
d = path[i]
move(d)
cursor = drone.get_loc()
pos = (cursor[0], cursor[1])
i = i - 1
# Now at target — probe it
probe_all(pos[0], pos[1])
cell = KNOWN[pos]
openings_copy = set()
for d in cell['openings']:
openings_copy.add(d)
discoveries.append((pos, openings_copy))
# Opportunistic exploration: greedy walk into unprobed neighbours
steps_used = len(path)
while steps_used < step_budget:
# Look for an unprobed direction from current cell
current_cell = KNOWN[pos]
next_dir = None
for d in current_cell['openings']:
dx, dy = DIRS[d]
neighbour_pos = (pos[0] + dx, pos[1] + dy)
if neighbour_pos in KNOWN:
if not KNOWN[neighbour_pos]['probed']:
next_dir = d
break
else:
# Unknown cell — definitely unprobed
next_dir = d
break
if next_dir == None:
break # no unprobed neighbours, stop here
move(next_dir)
steps_used = steps_used + 1
cursor = drone.get_loc()
pos = (cursor[0], cursor[1])
probe_all(pos[0], pos[1])
cell = KNOWN[pos]
openings_copy = set()
for d in cell['openings']:
openings_copy.add(d)
discoveries.append((pos, openings_copy))
return discoveries