456 lines
12 KiB
Python
456 lines
12 KiB
Python
from __builtins__ import *
|
|
from __utils__ import *
|
|
import drone
|
|
import farm
|
|
|
|
DIRS = {
|
|
North: (0, 1),
|
|
South: (0, -1),
|
|
East: (1, 0),
|
|
West: (-1, 0),
|
|
}
|
|
|
|
KNOWN = {}
|
|
|
|
|
|
def plant_maze(size: int):
|
|
substance_needed = size * 2 ** (num_unlocked(Unlocks.Mazes) - 1)
|
|
drone.harvest_except(Entities.Bush)
|
|
plant(Entities.Bush)
|
|
use_item(Items.Weird_Substance, substance_needed)
|
|
|
|
|
|
def wipe_known():
|
|
global KNOWN
|
|
KNOWN = {}
|
|
|
|
|
|
def do_maze(size: int, repeat: int, solve_fun=None, wipe_every: int = None):
|
|
if solve_fun == None:
|
|
solve_fun = astar_solver
|
|
loops = 0
|
|
wipe_known()
|
|
quick_print("do_maze size: ", size)
|
|
plant_maze(size)
|
|
if get_entity_type() == Entities.Treasure:
|
|
quick_print("Standing on treasure at start, harvesting")
|
|
before_treasure = num_items(Items.Gold)
|
|
harvest()
|
|
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
|
|
return
|
|
while True:
|
|
success, time = solve_fun()
|
|
if success:
|
|
loops += 1
|
|
seconds = time / 400 / (num_unlocked(Unlocks.Speed) + 1)
|
|
print("Solved in: ", seconds, " seconds. Loop: ", loops)
|
|
|
|
if repeat:
|
|
if loops >= repeat:
|
|
quick_print("Too many loops, cashing in")
|
|
before_treasure = num_items(Items.Gold)
|
|
harvest()
|
|
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
|
|
return
|
|
substance_needed = size * 2 ** (num_unlocked(Unlocks.Mazes) - 1)
|
|
if substance_needed > num_items(Items.Weird_Substance):
|
|
quick_print("Not enough substance to repeat maze")
|
|
before_treasure = num_items(Items.Gold)
|
|
harvest()
|
|
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
|
|
return
|
|
if loops % wipe_every == 0:
|
|
wipe_known()
|
|
use_item(Items.Weird_Substance, substance_needed)
|
|
else:
|
|
before_treasure = num_items(Items.Gold)
|
|
harvest()
|
|
quick_print("Treasure gained: ", num_items(Items.Gold) - before_treasure)
|
|
return
|
|
else:
|
|
break
|
|
|
|
|
|
def at_exit():
|
|
# Seems silly, but lets me put a breakpoint on the win condition
|
|
if get_entity_type() == Entities.Treasure:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def ensure_cell(x, y):
|
|
if (x, y) not in KNOWN:
|
|
KNOWN[(x, y)] = {'openings': set(), 'probed': False, 'neighbours': None}
|
|
return KNOWN[(x, y)]
|
|
|
|
|
|
def record_passage(x, y, d):
|
|
dx, dy = DIRS[d]
|
|
nx, ny = x + dx, y + dy
|
|
a = ensure_cell(x, y)
|
|
b = ensure_cell(nx, ny)
|
|
a['openings'].add(d)
|
|
b['openings'].add(opposite(d))
|
|
a['neighbours'] = None
|
|
b['neighbours'] = None
|
|
|
|
|
|
def probe_all(x, y):
|
|
cell = ensure_cell(x, y)
|
|
if cell['probed']:
|
|
return
|
|
for d in DIRS:
|
|
if can_move(d):
|
|
record_passage(x, y, d)
|
|
cell['probed'] = True
|
|
|
|
|
|
def neighbours(x, y):
|
|
cell = KNOWN[(x, y)]
|
|
if cell['neighbours'] != None:
|
|
return cell['neighbours']
|
|
result = []
|
|
for d in cell['openings']:
|
|
dx, dy = DIRS[d]
|
|
result.append((d, (x + dx, y + dy)))
|
|
cell['neighbours'] = result
|
|
return result
|
|
|
|
|
|
def all_cells():
|
|
return KNOWN
|
|
|
|
|
|
def frontier_cells():
|
|
tick_start = get_tick_count()
|
|
result = []
|
|
for pos in KNOWN:
|
|
cell = KNOWN[pos]
|
|
if not cell['probed']:
|
|
result.append(pos)
|
|
quick_print("frontier entry: ", tick_start, " exit: ", get_tick_count())
|
|
return result
|
|
|
|
|
|
def manhattan(a, b):
|
|
return abs(a[0] - b[0]) + abs(a[1] - b[1])
|
|
|
|
|
|
def a_star(start, goal):
|
|
tick_start = get_tick_count()
|
|
if start == goal:
|
|
return []
|
|
|
|
open_set = {start: manhattan(start, goal)} # pos -> f_score
|
|
came_from = {} # pos -> (prev_pos, direction)
|
|
g_score = {start: 0}
|
|
closed = set()
|
|
|
|
while open_set:
|
|
# Find pos in open_set with lowest f
|
|
current = None
|
|
current_f = None
|
|
for pos in open_set:
|
|
f = open_set[pos]
|
|
if current_f == None or f < current_f:
|
|
current = pos
|
|
current_f = f
|
|
|
|
if current == goal:
|
|
path = reconstruct_path(came_from, current)
|
|
quick_print("a* entry: ", tick_start, " exit: ", get_tick_count())
|
|
return path
|
|
|
|
open_set.pop(current)
|
|
closed.add(current)
|
|
|
|
for entry in neighbours(current[0], current[1]):
|
|
d = entry[0]
|
|
neighbour = entry[1]
|
|
if neighbour in closed:
|
|
continue
|
|
tentative_g = g_score[current] + 1
|
|
if neighbour in g_score:
|
|
best_known = g_score[neighbour]
|
|
else:
|
|
best_known = None
|
|
if best_known == None or tentative_g < best_known:
|
|
came_from[neighbour] = (current, d)
|
|
g_score[neighbour] = tentative_g
|
|
open_set[neighbour] = tentative_g + manhattan(neighbour, goal)
|
|
return None
|
|
|
|
|
|
def reconstruct_path(came_from, end):
|
|
tick_start = get_tick_count()
|
|
path = []
|
|
current = end
|
|
while current in came_from:
|
|
entry = came_from[current]
|
|
prev = entry[0]
|
|
d = entry[1]
|
|
path.append(d)
|
|
current = prev
|
|
quick_print("reconstruct_path entry: ", tick_start, " exit: ", get_tick_count())
|
|
return path # backwards: path[-1] is first step, path[0] is last step
|
|
|
|
def astar_solver() -> tuple[bool, int]:
|
|
tick_start = get_tick_count()
|
|
cursor = drone.get_loc()
|
|
exit_pos = measure()
|
|
quick_print("explore_to_exit start: ", cursor, " exit: ", exit_pos)
|
|
ensure_cell(cursor[0], cursor[1])
|
|
probe_all(cursor[0], cursor[1])
|
|
|
|
steps = 0
|
|
while not at_exit():
|
|
# Try to plan straight to the exit through known graph
|
|
if exit_pos in KNOWN:
|
|
path = a_star(cursor, exit_pos)
|
|
else:
|
|
path = None
|
|
|
|
if path == None:
|
|
# Exit not reachable yet — head to nearest unprobed frontier instead
|
|
target = pick_frontier(cursor, exit_pos)
|
|
quick_print("No path to exit; frontier target: ", target, " known cells: ", len(KNOWN))
|
|
if target == None:
|
|
quick_print("No frontier left, giving up")
|
|
return False, 0 # fully explored, no exit found — shouldn't happen if exit_pos is valid
|
|
path = a_star(cursor, target)
|
|
if path == None:
|
|
quick_print("Frontier ", target, " unreachable from ", cursor)
|
|
return False, 0 # frontier somehow unreachable — bug
|
|
|
|
# Walk the plan. path[-1] is first step, path[0] is last step.
|
|
# Stop early only if we step into an unprobed cell — that may reveal a shortcut.
|
|
i = len(path) - 1
|
|
while i >= 0 and not at_exit():
|
|
d = path[i]
|
|
move(d)
|
|
steps = steps + 1
|
|
cursor = drone.get_loc()
|
|
pos = (cursor[0], cursor[1])
|
|
if pos in KNOWN:
|
|
was_new = not KNOWN[pos]['probed']
|
|
else:
|
|
was_new = True
|
|
probe_all(cursor[0], cursor[1])
|
|
quick_print("step ", steps, " moved ", d, " to ", cursor, " plan_len: ", len(path), " new: ", was_new)
|
|
if was_new:
|
|
break # re-plan in case probing revealed a shortcut
|
|
i = i - 1
|
|
|
|
quick_print("Reached exit at ", cursor, " in ", steps, " steps")
|
|
quick_print("explore_to_exit entry: ", tick_start, " exit: ", get_tick_count())
|
|
return True, get_tick_count() - tick_start
|
|
|
|
|
|
def pick_frontier(cursor, exit_pos):
|
|
best = None
|
|
best_score = None
|
|
for pos in frontier_cells():
|
|
score = manhattan(cursor, pos) + manhattan(pos, exit_pos)
|
|
if best_score == None or score < best_score:
|
|
best_score = score
|
|
best = pos
|
|
return best
|
|
|
|
|
|
LEFT_OF = {North: West, West: South, South: East, East: North}
|
|
RIGHT_OF = {North: East, East: South, South: West, West: North}
|
|
|
|
|
|
def left_hand_solve()-> tuple[bool, int]:
|
|
tick_start = get_tick_count()
|
|
facing = North # arbitrary starting orientation
|
|
while not at_exit():
|
|
left = LEFT_OF[facing]
|
|
right = RIGHT_OF[facing]
|
|
back = opposite(facing)
|
|
|
|
if can_move(left):
|
|
facing = left
|
|
elif can_move(facing):
|
|
pass # keep facing
|
|
elif can_move(right):
|
|
facing = right
|
|
else:
|
|
facing = back # dead end, turn around
|
|
|
|
move(facing)
|
|
return True, get_tick_count() - tick_start
|
|
|
|
|
|
def astar_solver_parallel():
|
|
tick_start = get_tick_count()
|
|
cursor = drone.get_loc()
|
|
exit_pos = measure()
|
|
ensure_cell(cursor[0], cursor[1])
|
|
probe_all(cursor[0], cursor[1])
|
|
|
|
steps = 0
|
|
step_budget = 20 # how far drones explore beyond their frontier
|
|
|
|
while not at_exit():
|
|
# Try to plan straight to the exit
|
|
path = None
|
|
if exit_pos in KNOWN:
|
|
path = a_star(cursor, exit_pos)
|
|
|
|
if path != None:
|
|
# Exploit known route
|
|
i = len(path) - 1
|
|
while i >= 0 and not at_exit():
|
|
d = path[i]
|
|
move(d)
|
|
steps = steps + 1
|
|
cursor = drone.get_loc()
|
|
pos = (cursor[0], cursor[1])
|
|
if pos in KNOWN:
|
|
was_new = not KNOWN[pos]['probed']
|
|
else:
|
|
was_new = True
|
|
probe_all(cursor[0], cursor[1])
|
|
if was_new:
|
|
break
|
|
i = i - 1
|
|
continue
|
|
|
|
# No path to exit — need to expand frontier in parallel
|
|
frontiers = frontier_cells()
|
|
if len(frontiers) == 0:
|
|
return False, 0
|
|
|
|
# Rank frontiers by promise: closer to exit = better
|
|
ranked = []
|
|
for f in frontiers:
|
|
score = manhattan(f, exit_pos)
|
|
ranked.append((score, f))
|
|
# Simple insertion sort (no built-in sort assumed)
|
|
i = 1
|
|
while i < len(ranked):
|
|
j = i
|
|
while j > 0 and ranked[j][0] < ranked[j - 1][0]:
|
|
tmp = ranked[j]
|
|
ranked[j] = ranked[j - 1]
|
|
ranked[j - 1] = tmp
|
|
j = j - 1
|
|
i = i + 1
|
|
|
|
# Spawn drones on the most promising frontiers; main drone takes one too
|
|
drones = []
|
|
main_target = None
|
|
i = 0
|
|
while i < len(ranked):
|
|
target = ranked[i][1]
|
|
maybe = spawn_drone(explore_frontier, target, step_budget)
|
|
if maybe:
|
|
drones.append(maybe)
|
|
else:
|
|
if main_target == None:
|
|
main_target = target
|
|
else:
|
|
break # out of slots
|
|
i = i + 1
|
|
|
|
# Main drone does its own frontier work (or merge-only if all slots filled)
|
|
if main_target != None:
|
|
my_discoveries = explore_frontier(main_target, step_budget)
|
|
merge_discoveries(my_discoveries)
|
|
|
|
# Collect parallel drones
|
|
while len(drones) > 0:
|
|
thread = drones.pop(0)
|
|
discoveries = wait_for(thread)
|
|
merge_discoveries(discoveries)
|
|
|
|
cursor = drone.get_loc()
|
|
|
|
return True, get_tick_count() - tick_start
|
|
|
|
def merge_discoveries(discoveries):
|
|
for entry in discoveries:
|
|
pos = entry[0]
|
|
openings = entry[1]
|
|
cell = ensure_cell(pos[0], pos[1])
|
|
for d in openings:
|
|
if d not in cell['openings']:
|
|
# Re-establish the passage in our graph
|
|
dx, dy = DIRS[d]
|
|
neighbour_pos = (pos[0] + dx, pos[1] + dy)
|
|
ensure_cell(neighbour_pos[0], neighbour_pos[1])
|
|
cell['openings'].add(d)
|
|
neighbour_cell = KNOWN[neighbour_pos]
|
|
neighbour_cell['openings'].add(opposite(d))
|
|
cell['neighbours'] = None
|
|
neighbour_cell['neighbours'] = None
|
|
cell['probed'] = True
|
|
|
|
|
|
def explore_frontier(target, step_budget):
|
|
# Drone has its own copy of KNOWN from spawn
|
|
cursor = drone.get_loc()
|
|
pos = (cursor[0], cursor[1])
|
|
discoveries = []
|
|
|
|
# Make sure spawn position is in our local map
|
|
ensure_cell(pos[0], pos[1])
|
|
|
|
# A* to the target using local KNOWN
|
|
path = a_star(pos, target)
|
|
if path == None:
|
|
# Shouldn't happen — frontier is by definition in KNOWN
|
|
return discoveries
|
|
|
|
# Walk the path. path[-1] is first step.
|
|
i = len(path) - 1
|
|
while i >= 0:
|
|
d = path[i]
|
|
move(d)
|
|
cursor = drone.get_loc()
|
|
pos = (cursor[0], cursor[1])
|
|
i = i - 1
|
|
|
|
# Now at target — probe it
|
|
probe_all(pos[0], pos[1])
|
|
cell = KNOWN[pos]
|
|
openings_copy = set()
|
|
for d in cell['openings']:
|
|
openings_copy.add(d)
|
|
discoveries.append((pos, openings_copy))
|
|
|
|
# Opportunistic exploration: greedy walk into unprobed neighbours
|
|
steps_used = len(path)
|
|
while steps_used < step_budget:
|
|
# Look for an unprobed direction from current cell
|
|
current_cell = KNOWN[pos]
|
|
next_dir = None
|
|
for d in current_cell['openings']:
|
|
dx, dy = DIRS[d]
|
|
neighbour_pos = (pos[0] + dx, pos[1] + dy)
|
|
if neighbour_pos in KNOWN:
|
|
if not KNOWN[neighbour_pos]['probed']:
|
|
next_dir = d
|
|
break
|
|
else:
|
|
# Unknown cell — definitely unprobed
|
|
next_dir = d
|
|
break
|
|
|
|
if next_dir == None:
|
|
break # no unprobed neighbours, stop here
|
|
|
|
move(next_dir)
|
|
steps_used = steps_used + 1
|
|
cursor = drone.get_loc()
|
|
pos = (cursor[0], cursor[1])
|
|
probe_all(pos[0], pos[1])
|
|
cell = KNOWN[pos]
|
|
openings_copy = set()
|
|
for d in cell['openings']:
|
|
openings_copy.add(d)
|
|
discoveries.append((pos, openings_copy))
|
|
|
|
return discoveries |