{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Sheet 5 Reinforcement Learning: Tim Racs and Derrick Hines"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Value iteration"
]
},
{
"cell_type": "code",
"execution_count": 88,
"metadata": {},
"outputs": [],
"source": [
"from mllab.rl import TicTacToe, LGame"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We implement value iteration and test it on two board games, Tic-tac-toe and the L-Game ([Wikipedia](https://en.wikipedia.org/wiki/L_game)).\n",
"\n",
"Both games have the same interface, so you can write your code to work for both games without further changes. The interface is shown in the following."
]
},
{
"cell_type": "code",
"execution_count": 89,
"metadata": {},
"outputs": [],
"source": [
"game = LGame\n",
"state = game.unique_states[0] # A list of all possible game states. The states are normalized.\n",
"state\n",
"states = game.unique_states\n",
"v = {s: 0 for s in states} \n",
"s = states[0]"
]
},
{
"cell_type": "code",
"execution_count": 90,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▜ (1, 2), L2: ▛▀ (2, 0), N1: (0, 0), N2: (3, 2))"
]
},
"execution_count": 90,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"state = game.unique_states[0]\n",
"'''\n",
"print(\"Current player:\", state.player)\n",
"print(\"Winner?\", state.winner()) # returns either None or a player number\n",
"print(\"Is terminal? \", state.is_terminal()) # is the game finished?\n",
"print(\"List of valid actions: \", state.valid_actions())\n",
"'''\n",
"state"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's see how we can apply an action and get a new state."
]
},
{
"cell_type": "code",
"execution_count": 91,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Place piece at (2, 0, 0, 4)\n"
]
},
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▜ (0, 2), L2: ▛▀ (2, 0), N1: (0, 0), N2: (1, 0))"
]
},
"execution_count": 91,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import random\n",
"\n",
"action = random.choice(state.valid_actions())\n",
"print(\"Place piece at\", action)\n",
"new_state = state.apply_action(action)\n",
"\n",
"new_state"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Normalized States\n",
"\n",
"The returned `new_state` is **not normalized**. Since the value function is only defined for normalized states. You have to normalize the state."
]
},
{
"cell_type": "code",
"execution_count": 92,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1"
]
},
"execution_count": 92,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"new_state_normalized = new_state.normalized()\n",
"new_state_normalized.player"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Task 1\n",
"\n",
"Implement the `win_reward` and `value_iteration` function below. Use the game interface explained above.\n",
"\n",
"**Pitfall**: Let $s$ be the current state, $a$ be a valid action for $s$, and $s^\\prime$ be the state we get if action $a$ is taken in state $s$. Then, the reward for the player taking action $a$ in $s$ is _not_ $V(s^\\prime)$, but $-V(s^\\prime)$. We use one value function for both policies and only store the value for normalized states (for equivalence classes)."
]
},
{
"cell_type": "code",
"execution_count": 93,
"metadata": {},
"outputs": [],
"source": [
"from itertools import count\n",
"\n",
"\n",
"def win_reward(s, action=None):\n",
" \"\"\"\n",
" Compute the reward if in state s the given action is applied.\n",
" \n",
" If there is not winner, 0 is returned. Otherwise, 1 or -1 is returned,\n",
" depending on whether the current player has won or lost.\n",
" \"\"\"\n",
" # your code goes here\n",
" \n",
" if s.winner() == None:\n",
" return 0\n",
" elif s.winner() == s.player:\n",
" return 1\n",
" else: \n",
" return -1 \n",
" \n",
" \n",
"def win_reward2(s, action=None):\n",
" \"\"\"\n",
" Compute the reward if in state s the given action is applied.\n",
" \n",
" If there is not winner, 0 is returned. Otherwise, 1 or -1 is returned,\n",
" depending on whether the current player has won or lost.\n",
" \"\"\"\n",
" # your code goes here\n",
" \n",
" if action != None:\n",
" next_state = s.apply_action(action)\n",
" if next_state.winner() == None:\n",
" return 0\n",
" elif next_state.winner() == s.player:\n",
" return 1\n",
" else: #doesn't happen\n",
" return -1 \n",
" \n",
" else:\n",
" return 0\n",
" \n",
"\n",
"def value_iteration(game, asynchronous=True, reward=win_reward):\n",
" \"\"\"\n",
" Perform value iteration and return the value function.\n",
" \n",
" Parameters\n",
" ==========\n",
" \n",
" game: A game class (e.g. TicTacToe or LGame)\n",
" asynchronous: bool\n",
" Whether to do the updates directly on the current iterate\n",
" or if the iterate is only updated at the end of the max operation (updated in parallel, synchronous).\n",
" With other words, if the parameter is False the maximum is computed\n",
" independently (using old values, ignoring other updates). Otherwise, other max operations\n",
" in the same iteration are taken into account.\n",
" reward: function\n",
" A function which takes a state and an action and returns a number.\n",
" \"\"\"\n",
" states = game.unique_states\n",
" v = {s: 0 for s in states} # value function, initialized to 0\n",
" # your code goes here\n",
" change = True \n",
" \n",
" \n",
" if asynchronous == False:\n",
" \n",
" v_next = {s: 0 for s in states} \n",
" \n",
" for s in states:\n",
" if s.is_terminal():\n",
" v_next[s] = reward(s)\n",
" \n",
" \n",
" while change:\n",
" for s in states:\n",
" if not s.is_terminal(): \n",
" val = {a: (reward(s,a) - v[s.apply_action(a).normalized()]) for a in s.valid_actions()}\n",
" max_key = max(val.keys(), key=(lambda k: val[k]))\n",
" v_next[s] = val[max_key]\n",
" \n",
" change = (v_next != v) \n",
" v = v_next.copy()\n",
"\n",
" return(v)\n",
" \n",
" else:\n",
" \n",
" v_next = {s: 0 for s in states} \n",
" \n",
" for s in states:\n",
" if s.is_terminal():\n",
" v_next[s] = reward(s)\n",
" \n",
" \n",
" while(change):\n",
" for s in states:\n",
" if not s.is_terminal():\n",
" val = {a: (reward(s,a) - v_next[s.apply_action(a).normalized()]) for a in s.valid_actions()}\n",
" max_key = max(val.keys(), key=(lambda k: val[k]))\n",
" v_next[s] = val[max_key]\n",
" \n",
" change = (v_next != v) \n",
" \n",
" v = v_next.copy()\n",
" \n",
"\n",
" return(v)\n",
" \n",
" \n",
" \n",
" "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Compute an optimal value function for the game Tic-Tac-Toe."
]
},
{
"cell_type": "code",
"execution_count": 94,
"metadata": {},
"outputs": [],
"source": [
"vT_slow = value_iteration(TicTacToe, asynchronous=False)\n",
"vT_fast = value_iteration(TicTacToe, asynchronous=True)"
]
},
{
"cell_type": "code",
"execution_count": 95,
"metadata": {},
"outputs": [],
"source": [
"vT_slow2 = value_iteration(TicTacToe, asynchronous=False, reward = win_reward2)\n",
"vT_fast2 = value_iteration(TicTacToe, asynchronous=True, reward = win_reward2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now compute an optimal value function for the L-Game."
]
},
{
"cell_type": "code",
"execution_count": 96,
"metadata": {},
"outputs": [],
"source": [
"vL_slow = value_iteration(LGame, asynchronous=False)\n",
"vL_fast = value_iteration(LGame, asynchronous=True)"
]
},
{
"cell_type": "code",
"execution_count": 97,
"metadata": {},
"outputs": [],
"source": [
"vL_slow2 = value_iteration(LGame, asynchronous=False, reward = win_reward2)\n",
"vL_fast2 = value_iteration(LGame, asynchronous=True, reward = win_reward2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Write code to get an policy from a value function."
]
},
{
"cell_type": "code",
"execution_count": 98,
"metadata": {},
"outputs": [],
"source": [
"import random\n",
"\n",
"\n",
"class ValueFunctionBasedPolicy:\n",
" \"\"\"\n",
" A policy computed using a value function.\n",
" \n",
" Usage\n",
" =====\n",
" \n",
" # assume a value function is stored in v\n",
" policy = ValueFunctionBasedPolicy(v)\n",
"\n",
" # get a best action for state\n",
" action = policy[state]\n",
" \"\"\"\n",
"\n",
" def __init__(self, v, reward):\n",
" self._v = v\n",
" self._reward = reward\n",
" \n",
" def __getitem__(self, s):\n",
" \"\"\"Get an action for state s.\"\"\"\n",
" return random.choice(self.actions(s))\n",
" \n",
" def actions(self, s):\n",
" \"\"\"Get all actions which maximize the reward in state s.\"\"\"\n",
" actions = list(s.valid_actions())\n",
" if not actions:\n",
" return [] \n",
" # your code goes here # maximize reward or \"value\" ?\n",
" else:\n",
" val = {a: self.value(s,a) for a in actions}\n",
" max_key = max(val.keys(), key=(lambda k: val[k]))\n",
" max_val = val[max_key]\n",
" maxi_actions = [a for a,v in val.items() if v == max_val]\n",
" return(maxi_actions)\n",
"\n",
" def value(self, s, a):\n",
" return self._reward(s, a) - self._v[s.apply_action(a).normalized()]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let us watch the agent play against itself."
]
},
{
"cell_type": "code",
"execution_count": 99,
"metadata": {},
"outputs": [],
"source": [
"from mllab.rl import self_play"
]
},
{
"cell_type": "code",
"execution_count": 100,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"TicTacToe(o|x|o, x|o|x, x|o|x)"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Game finished.\n"
]
},
{
"data": {
"text/plain": [
"(TicTacToe(o|x|o, x|o|x, x|o|x), 9)"
]
},
"execution_count": 100,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(TicTacToe, ValueFunctionBasedPolicy(vT_fast, win_reward), sleep=2)"
]
},
{
"cell_type": "code",
"execution_count": 101,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"TicTacToe(x|x|o, o|o|x, x|x|o)"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Game finished.\n"
]
},
{
"data": {
"text/plain": [
"(TicTacToe(x|x|o, o|o|x, x|x|o), 9)"
]
},
"execution_count": 101,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(TicTacToe, ValueFunctionBasedPolicy(vT_fast2, win_reward2), sleep=2)"
]
},
{
"cell_type": "code",
"execution_count": 102,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▄▟ (2, 0), L2: ▛▀ (1, 0), N1: (0, 3), N2: (2, 3))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Stopped after 500 moves.\n"
]
},
{
"data": {
"text/plain": [
"(LGame(L1: ▙▄ (2, 1), L2: ▛▀ (1, 0), N1: (0, 0), N2: (0, 3)), 501)"
]
},
"execution_count": 102,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(LGame, ValueFunctionBasedPolicy(vL_fast, win_reward), sleep= 0.01, max_steps=500)"
]
},
{
"cell_type": "code",
"execution_count": 103,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▛ (0, 2), L2: ▄▟ (2, 1), N1: (0, 1), N2: (2, 1))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Stopped after 500 moves.\n"
]
},
{
"data": {
"text/plain": [
"(LGame(L1: ▛▀ (1, 0), L2: ▄▟ (2, 1), N1: (2, 1), N2: (2, 2)), 501)"
]
},
"execution_count": 103,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(LGame, ValueFunctionBasedPolicy(vL_fast2, win_reward2), sleep= 0.01, max_steps=500)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### L-Game Insights and Improvements"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The L-Game does never finish if both players are perfect. There some states though, which are special. Let us use the value function to get some insights.\n",
"\n",
"### Task 2\n",
"\n",
"Collect all states with negative value. How many are terminal, how many are not terminal? What does it mean that a state has a negative value but is not terminal? Ans/ States with negative value that are not terminal are states in which the current player has not already lost but that will definetely lose if the other player uses a perfect strategy"
]
},
{
"cell_type": "code",
"execution_count": 104,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The number of states with negative value is : 29\n",
"The number of states with negative value that are terminal is : 15\n",
"The number of states with negative value that are not terminal is : 14\n"
]
}
],
"source": [
"# your code goes here\n",
"\n",
"\n",
"negative_states = [s for s,v in vL_fast.items() if vL_fast[s] < 0]\n",
"\n",
"num_negative = len(negative_states)\n",
"\n",
"negative_term = [s for s in negative_states if s.is_terminal()]\n",
"\n",
"qterm = len(negative_term)\n",
"\n",
"negative_nonterm = [s for s in negative_states if not s.is_terminal()]\n",
"\n",
"qnonterm = len(negative_nonterm)\n",
"\n",
"\n",
"print(\"The number of states with negative value is : \", num_negative)\n",
"print(\"The number of states with negative value that are terminal is : \", qterm)\n",
"print(\"The number of states with negative value that are not terminal is : \", qnonterm)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let the agent start in every state with negative reward which is not terminal and let it play against itself. Use\n",
"```python\n",
"final_state, steps = self_play(state, ValueFunctionBasedPolicy(vL_fast), max_steps=100, sleep=0)\n",
"```\n",
"to compute the steps until the game terminates and it final state.\n",
"\n",
"Print the number of steps it takes to terminate and how the game ended. Run the code several times, what do you observe? Ans/ The number of steps as well as the final state change sometimes. "
]
},
{
"cell_type": "code",
"execution_count": 105,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{LGame(L1: ▜ (0, 0), L2: ▛ (0, 2), N1: (2, 0), N2: (3, 1)): (LGame(L1: ▟ (1, 2), L2: ▄▟ (1, 0), N1: (0, 2), N2: (3, 1)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▙ (0, 1), N1: (3, 2), N2: (3, 3)): (LGame(L1: ▙ (1, 0), L2: ▜ (1, 1), N1: (0, 0), N2: (0, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 1), N2: (3, 3)): (LGame(L1: ▙▄ (2, 0), L2: ▟ (0, 1), N1: (1, 0), N2: (2, 3)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 3)): (LGame(L1: ▀▜ (0, 1), L2: ▙▄ (1, 1), N1: (0, 0), N2: (2, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 2)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 1)),\n",
" 2),\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 2), N2: (3, 0)): (LGame(L1: ▟ (1, 2), L2: ▄▟ (0, 0), N1: (2, 2), N2: (3, 1)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 3)): (LGame(L1: ▙ (1, 0), L2: ▛ (0, 1), N1: (2, 3), N2: (3, 2)),\n",
" 6),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 3)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 1)): (LGame(L1: ▜ (0, 2), L2: ▟ (1, 1), N1: (0, 1), N2: (1, 0)),\n",
" 4),\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 0), N2: (2, 2)): (LGame(L1: ▟ (1, 2), L2: ▛ (1, 1), N1: (0, 3), N2: (1, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 2)): (LGame(L1: ▀▜ (0, 1), L2: ▙▄ (1, 1), N1: (0, 0), N2: (3, 1)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 2), N2: (3, 3)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 2)): (LGame(L1: ▀▜ (0, 1), L2: ▙▄ (1, 1), N1: (0, 0), N2: (2, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (1, 2), L2: ▙ (0, 1), N1: (0, 2), N2: (3, 2)): (LGame(L1: ▙ (1, 0), L2: ▛ (0, 1), N1: (2, 3), N2: (3, 2)),\n",
" 2)}"
]
},
"execution_count": 105,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# your code goes here\n",
"\n",
"results = {s: 0 for s in negative_nonterm}\n",
"\n",
"avg_num_steps = {s:0 for s in negative_nonterm}\n",
"\n",
"for n in range(10):\n",
"\n",
" for state in negative_nonterm:\n",
" results[state] = self_play(state, ValueFunctionBasedPolicy(vL_fast, win_reward), max_steps=100, sleep=0)\n",
" avg_num_steps[state] = (1/(n+1))* results[state][1] + (n/(n+1))* avg_num_steps[state]\n",
"results"
]
},
{
"cell_type": "code",
"execution_count": 106,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{LGame(L1: ▜ (0, 0), L2: ▛ (0, 2), N1: (2, 0), N2: (3, 1)): 2.4000000000000004,\n",
" LGame(L1: ▜ (0, 2), L2: ▙ (0, 1), N1: (3, 2), N2: (3, 3)): 2.8000000000000003,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 1), N2: (3, 3)): 2.4000000000000004,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 3)): 3.800000000000001,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 2)): 2.0,\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 2), N2: (3, 0)): 2.6000000000000005,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 3)): 5.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 3)): 3.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 1)): 2.6,\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 0), N2: (2, 2)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 2)): 3.6,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 2), N2: (3, 3)): 3.3999999999999995,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 2)): 3.4,\n",
" LGame(L1: ▜ (1, 2), L2: ▙ (0, 1), N1: (0, 2), N2: (3, 2)): 2.4000000000000004}"
]
},
"execution_count": 106,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"avg_num_steps"
]
},
{
"cell_type": "code",
"execution_count": 107,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▜ (0, 2), L2: ▄▟ (1, 0), N1: (0, 1), N2: (3, 2))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Game finished.\n"
]
},
{
"data": {
"text/plain": [
"(LGame(L1: ▜ (0, 2), L2: ▄▟ (1, 0), N1: (0, 1), N2: (3, 2)), 4)"
]
},
"execution_count": 107,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(negative_nonterm[6], ValueFunctionBasedPolicy(vL_fast, win_reward), max_steps=100, sleep=2)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Use the computed value function to design a new reward which improves the behaivor."
]
},
{
"cell_type": "code",
"execution_count": 108,
"metadata": {},
"outputs": [],
"source": [
"good_positions = set(s for s in LGame.unique_states if vL_fast[s] > 0 and s.winner() is None)\n",
"bad_positions = set(s for s in LGame.unique_states if vL_fast[s] < 0 and s.winner() is None)\n",
"\n",
"\n",
"def asap_win_reward(s, a=None):\n",
" # your code goes here\n",
"\n",
" \n",
" if s.is_terminal():\n",
" return win_reward(s,a)\n",
" \n",
" else: \n",
" if s.apply_action(a).normalized() in bad_positions:\n",
" return 1\n",
" elif s.apply_action(a).normalized() in good_positions:\n",
" return -1\n",
" else:\n",
" return 0\n",
" \n",
" \n",
"vL_improved = value_iteration(LGame, asynchronous= True, reward = asap_win_reward)"
]
},
{
"cell_type": "code",
"execution_count": 109,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▄▟ (0, 0), L2: ▛▀ (2, 1), N1: (3, 2), N2: (3, 3))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Stopped after 500 moves.\n"
]
},
{
"data": {
"text/plain": [
"(LGame(L1: ▙▄ (0, 1), L2: ▛▀ (2, 1), N1: (1, 0), N2: (3, 3)), 501)"
]
},
"execution_count": 109,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(LGame, ValueFunctionBasedPolicy(vL_improved, asap_win_reward), sleep= 0.01, max_steps=500)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, compute the number of steps before termination again and compare. Maybe, also read the Wikipedia article on the L-Game referenced at the beginning."
]
},
{
"cell_type": "code",
"execution_count": 110,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{LGame(L1: ▜ (0, 0), L2: ▛ (0, 2), N1: (2, 0), N2: (3, 1)): (LGame(L1: ▟ (1, 2), L2: ▀▜ (1, 0), N1: (0, 2), N2: (3, 1)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▙ (0, 1), N1: (3, 2), N2: (3, 3)): (LGame(L1: ▙ (1, 0), L2: ▛ (0, 1), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 1), N2: (3, 3)): (LGame(L1: ▙▄ (2, 0), L2: ▄▟ (1, 1), N1: (0, 1), N2: (1, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 3)): (LGame(L1: ▀▜ (0, 1), L2: ▙▄ (1, 1), N1: (0, 0), N2: (2, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 2)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 2), N2: (3, 0)): (LGame(L1: ▟ (1, 2), L2: ▜ (1, 0), N1: (0, 3), N2: (1, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 3)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 3)): (LGame(L1: ▙▄ (2, 0), L2: ▙▄ (1, 1), N1: (0, 1), N2: (1, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 1)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 1)),\n",
" 2),\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 0), N2: (2, 2)): (LGame(L1: ▄▟ (2, 1), L2: ▛▀ (1, 1), N1: (0, 0), N2: (3, 0)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 2)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 2), N2: (3, 3)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 2)): (LGame(L1: ▀▜ (0, 1), L2: ▛▀ (1, 0), N1: (2, 3), N2: (3, 2)),\n",
" 2),\n",
" LGame(L1: ▜ (1, 2), L2: ▙ (0, 1), N1: (0, 2), N2: (3, 2)): (LGame(L1: ▙ (1, 0), L2: ▛▀ (1, 1), N1: (0, 1), N2: (3, 2)),\n",
" 2)}"
]
},
"execution_count": 110,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# your code goes here\n",
"\n",
"results2 = {s: 0 for s in negative_nonterm}\n",
"\n",
"avg_num_steps2 = {s:0 for s in negative_nonterm}\n",
"\n",
"for n in range(10):\n",
"\n",
" for state in negative_nonterm:\n",
" results2[state] = self_play(state, ValueFunctionBasedPolicy(vL_improved, asap_win_reward), max_steps=100, sleep=0)\n",
" avg_num_steps2[state] = (1/(n+1))* results2[state][1] + (n/(n+1))* avg_num_steps2[state]\n",
"results2"
]
},
{
"cell_type": "code",
"execution_count": 111,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{LGame(L1: ▜ (0, 0), L2: ▛ (0, 2), N1: (2, 0), N2: (3, 1)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▙ (0, 1), N1: (3, 2), N2: (3, 3)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 1), N2: (3, 3)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 3)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 2)): 2.0,\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 2), N2: (3, 0)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 3)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (0, 0), N2: (3, 3)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 1)): 2.0,\n",
" LGame(L1: ▜ (1, 2), L2: ▛ (0, 1), N1: (2, 0), N2: (2, 2)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 1), N2: (3, 2)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (3, 2), N2: (3, 3)): 2.0,\n",
" LGame(L1: ▜ (0, 2), L2: ▀▜ (1, 0), N1: (2, 0), N2: (3, 2)): 2.0,\n",
" LGame(L1: ▜ (1, 2), L2: ▙ (0, 1), N1: (0, 2), N2: (3, 2)): 2.0}"
]
},
"execution_count": 111,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"avg_num_steps2 "
]
},
{
"cell_type": "code",
"execution_count": 112,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
" "
],
"text/plain": [
"LGame(L1: ▀▜ (0, 1), L2: ▙▄ (1, 1), N1: (0, 0), N2: (3, 1))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Game finished.\n"
]
},
{
"data": {
"text/plain": [
"(LGame(L1: ▀▜ (0, 1), L2: ▙▄ (1, 1), N1: (0, 0), N2: (3, 1)), 2)"
]
},
"execution_count": 112,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"self_play(negative_nonterm[6], ValueFunctionBasedPolicy(vL_improved, asap_win_reward), max_steps=100, sleep=2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Notice that with the improved value function the game finishes after two steps. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Policy Iteration"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Task 3\n",
"\n",
"Implement `policy_evaluation` and `policy_improvement` below."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The assumption does not hold because if a player applies an action, the result of that action will depend on the action that the other player makes. If white applies an action, then black another one and the environment returns this new state to white. "
]
},
{
"cell_type": "code",
"execution_count": 113,
"metadata": {},
"outputs": [],
"source": [
"import random\n",
"import math\n",
"from mllab.rl import Policy\n",
"from itertools import cycle\n",
"\n",
"\n",
"def policy_improvement(p1, p2, v1, v2, reward):\n",
" \"\"\"\n",
" Compute an improved policy for the states using the value function v.\n",
" \n",
" Parameters\n",
" ==========\n",
" p1: dict\n",
" Policy for player 1\n",
" p2: dict\n",
" Policy for player 2\n",
" v1: dict\n",
" Value function for player 1\n",
" v2: dict\n",
" Value function for player 2\n",
" reward: callable\n",
" The reward function\n",
" Returns\n",
" =======\n",
" (dict, dict)\n",
" Two policies, one for player 1, the second for player 2\n",
" \"\"\"\n",
" \n",
" states = list(v1.keys())\n",
"\n",
"\n",
" # compute greedy update for p1 (as before)\n",
" for s in states:\n",
" # your code goes here\n",
" if not s.is_terminal():\n",
" val = {a: reward(s,a) - v1[s.apply_action(a).normalized()] for a in s.valid_actions()}\n",
" p1[s] = max(val.keys(), key=(lambda k: val[k]))\n",
" \n",
" \n",
" \n",
" # I tried this instead but it did not work either\n",
" '''for s in states:\n",
" # your code goes here\n",
" if not s.is_terminal():\n",
" res = {a: s.apply_action(a).normalized() for a in s.valid_actions()}\n",
" res_nt = ({a : res[a] for a,r in res.items() \n",
" if not r.is_terminal()} ) \n",
" \n",
" res_t = ({a : res[a] for a,r in res.items() \n",
" if r.is_terminal()} ) \n",
"\n",
" val1 = ({a: reward(s,a) + v1[res_nt[a].apply_action(p2[res_nt[a]]).normalized()] \n",
" for a in res_nt.keys()}) \n",
" \n",
" val2 = {a: reward(s,a) - v1[res_t[a]] for a in res_t.keys()}\n",
" \n",
" val = {**val1, **val2}\n",
" \n",
" p1[s] = max(val.keys(), key=(lambda k: val[k])) '''\n",
" \n",
" \n",
"\n",
" # compute best response to p1 for p2\n",
" v1 = policy_evaluation(states, p1, p2, reward)\n",
" v2 = policy_evaluation(states, p2, p1, reward)\n",
" p2 = {}\n",
" \n",
"\n",
" while True:\n",
" print('P', end='') # print a 'P' for each iteration\n",
" # your code goes here\n",
" for s in states:\n",
" if not s.is_terminal():\n",
" res = {a: s.apply_action(a).normalized() for a in s.valid_actions()}\n",
" res_nt = ({a : res[a] for a,r in res.items() \n",
" if not r.is_terminal()} ) \n",
" \n",
" res_t = ({a : res[a] for a,r in res.items() \n",
" if r.is_terminal()} ) \n",
"\n",
" val1 = ({a: reward(s,a) + v2[res_nt[a].apply_action(p1[res_nt[a]]).normalized()] \n",
" for a in res_nt.keys()}) \n",
" \n",
" val2 = {a: reward(s,a) - v2[res_t[a]] for a in res_t.keys()}\n",
" \n",
" val = {**val1, **val2}\n",
" \n",
" p2[s] = max(val.keys(), key=(lambda k: val[k]))\n",
" \n",
" v2_new = policy_evaluation(states, p2, p1, reward)\n",
" if v2 == v2_new: break\n",
" v2 = v2_new.copy() \n",
" \n",
" \n",
" return p1, p2\n",
"\n",
"\n",
"def policy_evaluation(states, p1, p2, reward):\n",
" \"\"\"\n",
" Compute the value function for given policies.\n",
" \n",
" Parameters\n",
" ==========\n",
" states: list of normalized states\n",
" p1: dict\n",
" Policy for first player\n",
" p2: dict\n",
" Policy for second player\n",
" \"\"\"\n",
" v = {}\n",
" \n",
" for s in states: \n",
" current = s\n",
" rewards = []\n",
" final_reward = 0\n",
" sign = 1 # cf. pitfall\n",
" # store (sign, state) pairs and check for cycles!\n",
" trajectory = []\n",
" has_cycle = False\n",
" # iterate until the current state is terminal (i.e., None)\n",
" if current.is_terminal(): \n",
" final_reward = sign * reward(current) ## we need to some final reward +- 1\n",
" current = None\n",
" while current is not None and not has_cycle:\n",
" # your code goes here \n",
" \n",
" pair = (sign, current)\n",
" \n",
" if pair in trajectory:\n",
" has_cycle = True \n",
" \n",
" trajectory.append(pair)\n",
" \n",
" if sign == 1: \n",
" action = p1[current]\n",
" else:\n",
" action = p2[current]\n",
" \n",
" rewards.append(sign * reward(current,action))\n",
" \n",
" \n",
" current = current.apply_action(action).normalized()\n",
" sign *= -1\n",
" \n",
" if current.is_terminal(): \n",
" final_reward = sign* reward(current)\n",
" current = None\n",
" \n",
" if not has_cycle: \n",
" v[s] = sum(rewards) + final_reward ## add the +-1 \n",
" else:\n",
" first_ap = trajectory.index(pair) ## cycle starts in the path\n",
" sum_0 = sum(rewards[:first_ap])\n",
" sum_1 = sum(rewards[first_ap:]) ## sum of the value of the cycle\n",
" if(sum_1 > 0):\n",
" v[s] = math.inf \n",
" elif(sum_1 < 0):\n",
" v[s] = - math.inf\n",
" else:\n",
" v[s] = sum_0\n",
" \n",
" return v\n",
"\n",
"\n",
"def policy_iteration(game, reward):\n",
" \"\"\"\n",
" Perform policy iteration for the given game, and reward function.\n",
" \n",
" Returns\n",
" ======= \n",
" policy: Policy\n",
" \"\"\"\n",
" states = game.unique_states\n",
" p1 = {s: random.choice(s.valid_actions()) for s in states if s.valid_actions()}\n",
" p2 = {s: random.choice(s.valid_actions()) for s in states if s.valid_actions()}\n",
" v1, v2 = {}, {}\n",
" while True:\n",
" v1_new = policy_evaluation(states, p1, p2, reward) ##each value function for each player\n",
" v2_new = policy_evaluation(states, p2, p1, reward) \n",
" if v1 == v1_new and v2 == v2_new: \n",
" break\n",
" v1, v2 = v1_new.copy(), v2_new.copy() \n",
" p1, p2 = policy_improvement(p1, p2, v1, v2, reward)\n",
" print('.', end='')\n",
" print('')\n",
" return Policy(game, p1) \n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 114,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"PPPP.PPP.PPP.PPP.PP.PP.P.P.\n"
]
}
],
"source": [
"pT_policy_iteration = policy_iteration(TicTacToe, win_reward)"
]
},
{
"cell_type": "code",
"execution_count": 115,
"metadata": {},
"outputs": [],
"source": [
"# self_play(TicTacToe.unique_states[148], pT_policy_iteration, sleep=2)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The policy iteration does might not converge for the L-Game, more specifically, finding the best response in `policy_improvement`. What it the reason? Can you fix it? (optional)"
]
},
{
"cell_type": "code",
"execution_count": 116,
"metadata": {},
"outputs": [],
"source": [
"#pL_pi = policy_iteration(LGame, win_reward)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}