Unicorn 处理间接跳转
@source: https://bbs.kanxue.com/thread-280231-1.htm
Unicorn#
基于 qemu,支持多架构 CPU
Python/C 脚本
↓
Unicorn API
↓
QEMU TCG (Tiny Code Generator) ← 核心翻译引擎
↓
翻译为宿主机原生指令执行
核心流程#
# unicorn_skeleton.py
import unicorn
from unicorn.arm64_const import *
# Step 1: 创建引擎 —— 指定架构和模式
uc = unicorn.Uc(unicorn.UC_ARCH_ARM64, unicorn.UC_MODE_ARM)
# Step 2: 映射内存 —— 划定沙箱边界
uc.mem_map(0x1000, 0x4000) # 从 0x1000 开始,映射 16KB
# Step 3: 写入数据 —— 放入代码和初始状态
uc.mem_write(0x1000, b"\x00" * 0x100)
uc.reg_write(UC_ARM64_REG_SP, 0x4000)
# Step 4: 挂钩子 —— 安装观察窗口(可选)
def on_insn(uc, address, size, user_data):
print(f"PC=0x{address:x}")
uc.hook_add(unicorn.UC_HOOK_CODE, on_insn)
# Step 5: 启动模拟 —— 指定起止地址
uc.emu_start(begin=0x1000, until=0x1010)
内存映射需要页对齐(4KB)
hook#
可以对特定事件插桩
Unicorn 执行引擎
│
├── 执行每条指令前 → UC_HOOK_CODE
├── 读内存时 → UC_HOOK_MEM_READ
├── 写内存时 → UC_HOOK_MEM_WRITE
├── 访问未映射内存 → UC_HOOK_MEM_UNMAPPED
├── 执行中断/syscall → UC_HOOK_INSN (特定指令)
└── 遇到无效指令 → UC_HOOK_INSN_INVALID
# trace_example.py
import unicorn, capstone
from unicorn.arm64_const import *
CODE = bytes([
0x20, 0x00, 0x80, 0xD2, # MOV X0, #1
0x41, 0x00, 0x80, 0xD2, # MOV X1, #2
0x02, 0x00, 0x01, 0x8B, # ADD X2, X0, X1
])
BASE = 0x1000
uc = unicorn.Uc(unicorn.UC_ARCH_ARM64, unicorn.UC_MODE_ARM)
uc.mem_map(BASE, 0x1000)
uc.mem_write(BASE, CODE)
cs = capstone.Cs(capstone.CS_ARCH_AARCH64, capstone.CS_MODE_ARM)
def hook_code(uc, address, size, user_data):
insn_bytes = uc.mem_read(address, size)
for insn in cs.disasm(bytes(insn_bytes), address):
print(f" [0x{address:08x}] {insn.mnemonic:<8} {insn.op_str}")
def hook_mem_invalid(uc, access_type, address, size, value, user_data):
"""
access_type:
- UC_MEM_READ_UNMAPPED
- UC_MEM_WRITE_UNMAPPED
- UC_MEM_FETCH_UNMAPPED 执行未映射地址
"""
access_names = {
unicorn.UC_MEM_READ_UNMAPPED: "READ",
unicorn.UC_MEM_WRITE_UNMAPPED: "WRITE",
unicorn.UC_MEM_FETCH_UNMAPPED: "FETCH",
}
print(f"[!] {access_names[access_type]} @ 0x{address:x} (size={size})")
if access_type == unicorn.UC_MEM_FETCH_UNMAPPED:
return False
# 读写失败重试
page_addr = address & ~0xFFF
uc.mem_map(page_addr, 0x1000)
return True
uc.hook_add(unicorn.UC_HOOK_CODE, hook_code)
uc.hook_add(unicorn.UC_HOOK_MEM_UNMAPPED, hook_mem_invalid)
uc.emu_start(BASE, BASE + len(CODE))
hook 地址范围过滤
# 只监控 0x1000~0x2000 之间的代码执行
uc.hook_add(unicorn.UC_HOOK_CODE, hook_code, begin=0x1000, end=0x2000)
# 只监控 0x5000 的内存写入
uc.hook_add(unicorn.UC_HOOK_MEM_WRITE, hook_write, begin=0x5000,
end=0x5004)
TCG 执行原理#
ARM64 字节码
│
▼
┌─────────────────────────────────┐
│ QEMU TCG 前端 │
│ ARM64 解码器 → TCG IR 中间表示 │
│ │
│ ADD X2, X0, X1 │
│ ↓ 翻译为 TCG IR │
│ tcg_gen_add_i64(X2, X0, X1) │
└─────────────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ QEMU TCG 后端 │
│ 把 TCG IR 编译为宿主机指令 │
│ (Mac 是 ARM64,直接接近 │
│ 原生;x86 Mac 需要翻译) │
└─────────────────────────────────┘
│
▼
翻译块 (Translation Block, TB)
缓存起来,下次同样代码直接复用
│
▼
CPU 执行
翻译块 (TB)#
一段连续直线执行的代码,遇到跳转就结束
0x1000: MOV X0, #1 ┐
0x1004: MOV X1, #2 ├── TB1(直线,无跳转)
0x1008: ADD X2, X0, X1┘
0x100C: CBZ X2, 0x2000 ← 条件跳转,TB1 结束
0x1010: MOV X3, #9 ┐
0x1014: RET ┘── TB2
0x2000: MOV X3, #0 ┐
0x2004: RET ┘── TB3
- Unicorn 在翻译每个 TB 时为每条指令注入了回调点(UC_HOOK_CODE),因此 hook 的性能开销在复杂循环中很显著,善用地址范围过滤精准插桩
- 第一次执行需要翻译,比后续慢,之后命中 TB Cache 直接执行
- 修改内存后要调用 emu_stop() 再重启:旧的 TB 可能已经缓存,强制清除缓存后重新翻译
反调试对抗#
| 反调试手段 | 检测方式 | Unicorn 对策 |
|---|---|---|
ptrace self |
syscall 返回值 |
hook syscall 伪造返回值 |
| 时间检测 | 两次 gettimeofday 比较 |
hook syscall,返回固定值 |
| smc | 运行时解密 | UC_HOOK_MEM_WRITE 捕获解密时机 |
# syscall_smc_hook.py
import unicorn
from unicorn.x86_const import *
FAKE_TIME = 1700000000 # 伪造时间戳
# SMC
CODE_START = 0x1000
CODE_END = 0x5000
decrypted_regions = []
uc = unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_64)
def hook_syscall(uc, user_data):
rax = uc.reg_read(UC_X86_REG_RAX)
if rax == 96: # gettimeofday (Linux x86_64)
timeval_ptr = uc.reg_read(UC_X86_REG_RDI)
import struct
uc.mem_write(timeval_ptr, struct.pack("<QQ", FAKE_TIME, 0))
uc.reg_write(UC_X86_REG_RAX, 0) # 返回 0 = 成功
elif rax == 101: # ptrace
uc.reg_write(UC_X86_REG_RAX, 0xFFFFFFFFFFFFFFFF) # 返回 -1
elif rax == 60: # exit
uc.emu_stop()
def hook_code_write(uc, access, address, size, value, user_data):
if CODE_START <= address < CODE_END:
data = bytes(uc.mem_read(address, size))
print(f"[SMC] 代码段 0x{address:x} 被写入: {data.hex()}")
decrypted_regions.append((address, size))
uc.hook_add(unicorn.UC_HOOK_INSN, hook_syscall, unicorn.x86_const.UC_X86_INS_SYSCALL)
uc.hook_add(unicorn.UC_HOOK_MEM_WRITE, hook_code_write, begin=CODE_START, end=CODE_END)
小练习#
import unicorn
from unicorn.arm64_const import *
from keystone import *
code = """
loop:
LDRB W3, [X0, X1]
EOR W3, W3, W2
STRB W3, [X0, X1]
ADD X1, X1, #1
CMP X1, #8
B.LT loop
"""
try:
ks = Ks(KS_ARCH_ARM64, keystone.KS_MODE_LITTLE_ENDIAN)
encoding, count = ks.asm(code)
CODE = bytes(list(encoding))
except keystone.KsError as e:
print(f"ERROR: {e}")
CODE = bytes([
0x03, 0x68, 0x61, 0x38, 0x63, 0x00, 0x02, 0x4a, 0x03, 0x68, 0x21, 0x38, 0x21, 0x04, 0x00, 0x91, 0x3f, 0x20, 0x00, 0xf1, 0x6b, 0xff, 0xff, 0x54
])
CODE_BASE = 0
BUF_BASE = 0x1000
STACK = 0x4000
buf_data = b"HELLO!!!"
uc = unicorn.Uc(unicorn.UC_ARCH_ARM64, unicorn.UC_MODE_ARM)
uc.mem_map(CODE_BASE, 0x1000)
uc.mem_map(BUF_BASE, 0x1000)
uc.mem_map(STACK, 0x1000)
uc.mem_write(CODE_BASE, CODE)
uc.mem_write(BUF_BASE, buf_data)
uc.reg_write(UC_ARM64_REG_X0, BUF_BASE)
uc.reg_write(UC_ARM64_REG_X1, 0x00)
uc.reg_write(UC_ARM64_REG_X2, 0x55)
def hook_code(uc, access, address, size, value, user_data):
if BUF_BASE <= address < BUF_BASE + 8:
idx = address - BUF_BASE
print(f" buf[{idx}] = 0x{value & 0xFF:02x} ({chr(value & 0xFF) if 32 <= (value & 0xFF) < 127 else '.'})")
uc.hook_add(unicorn.UC_HOOK_MEM_WRITE, hook_code)
uc.emu_start(CODE_BASE, CODE_BASE + len(CODE))
result = bytes(uc.mem_read(BUF_BASE, 8))
print(f"\nresult: {result.hex()} = {result}")
了解一下 ADRP (Form PC-Raelative Address to Page):
$$ X25 = (\text{PC} \& \sim \text{0xFFF}) + (\text{immediate} \ll 12) $$
此处 #off_212C70@PAGE (immediate) 即目标数据与当前指令页边界的页偏移
- 将当前指令所在地址(PC)的低 12 位清零,即对齐到 4KB 页边界
- 把指令中给出的立即数左移 12 位,结果相加放入目标寄存器
google 会说 ADRP 常与 ADD/LDR 搭配使用
利用页表的特性来设计位位置无关代码:ADRP 跨大步找到目标数据所在的 4KB 页面,再用 ADD/LDR 跨小步,加上页内偏移定位到字节
闲得没事算一下,ai 一直算不对
机器码 79 08 00 F0,大端 F0 00 08 79 = 11110000000000000000100001111001
ADRP 标准编码格式:
| 位范围 | 31 | 30:29 | 28:24 | 23:5 | 4:0 |
|---|---|---|---|---|---|
| 含义 | op | immlo | Fixed | immhi | Rd |
| ADRP 特征值 | 1 | (立即数低位) | 10000 | (立即数高位) | (目标寄存器) |
immlo = 11 = 3
immhi = 0000000000001000011 = 67
pc = 0x1031a0
>>> immlo = 3
>>> immhi = 67
>>> imm = (immhi << 2) | immlo
>>> imm
271
>>> hex((pc & ~0xFFF) + (imm << 12))
'0x212000'
IDA 中的符号 #off_212C70@PAGE 表示 212C70 的高位,即页边界,#off_212C70@PAGEOFF 表示低位,即页内偏移,原来 IDA 里面的 ADRP X25, #off_212C70@PAGE 直接当成赋值语句就好了
混淆模式#
有如下的混淆
CMP X19, #0
MOV W9, #0x40 ; '@' ; X9 = 0x40
MOV W10, #0x38 ; '8' ; X10 = 0x38
ADRP X25, #off_212C70@PAGE
CSEL X9, X10, X9, EQ ; if X19 == 0 then X9 = X10
ADD X25, X25, #off_212C70@PAGEOFF ; X25 = 0x212C70
LDR X8, [X25,X9] ; X8 = qword[X25 + X9]
MOV W9, #0xFE53 ; X9 = 0xFE53
MOV W10, #0x82B4 ; X10 = 0x82B4
CSEL X9, X10, X9, EQ ; if X19 == 0 then X9 = X10
SUB X8, X8, X9 ; X8 = X8 - X9
BR X8 ; 跳转到X8
0x212C70 是一个函数表,在代码里面就是 X25。X9 是函数表内的偏移,根据 X19 的值决定偏移的值,共两种情况
但是为什么偏移算完之后那么大,完全不在表里面了,有点反直觉
这样的函数反编译之后大概长这样,修复后是一个巨大的函数
void __fastcall sub_103168(__int64 a1, __int64 a2)
{
__int64 v2; // x9
__int64 v3; // x8
__int64 v4; // x9
_ReadStatusReg(TPIDR_EL0);
v2 = 64;
if ( !a2 )
v2 = 56;
v3 = *(__int64 *)((char *)&off_212C70 + v2);
v4 = 65107;
if ( !a2 )
v4 = 33460;
__asm { BR X8 }
}
下面还有大量这样类似模式的连续的间接跳转。因为 X25 复用所以有点不一样
__int64 sub_103258()
{
__int64 v0; // x25
__int64 v1; // x26
__int64 v2; // x29
__int64 v3; // x0
__int64 v4; // x8
__int64 v5; // x8
__int64 v6; // x9
v3 = sub_1500BC(v2 - 96);
if ( (v1 | 0x2B355CD000000000uLL) >= 0x3CCCE95800000000LL )
v4 = 48;
else
v4 = 0;
v5 = *(_QWORD *)(v0 + v4);
v6 = 49525;
if ( (v1 | 0x2B355CD000000000uLL) < 0x3CCCE95800000000LL )
v6 = 216;
return ((__int64 (__fastcall *)(__int64))(v5 - v6))(v3);
}
以及存在这样的单分支跳转
.text:00000000000FEA1C 9B 08 00 F0 7B 47 3C 91 ADRL X27, unk_211F11
.text:00000000000FEA24 81 08 00 F0 21 B0 3E 91 ADRL X1, unk_211FAC
.text:00000000000FEA2C 22 01 80 52 MOV W2, #9
.text:00000000000FEA30 E3 03 1F 32 MOV W3, #2
.text:00000000000FEA34 E0 03 1B AA MOV X0, X27
.text:00000000000FEA38 A9 9C FC 97 BL sub_25CDC
.text:00000000000FEA3C 88 82 41 F9 LDR X8, [X20,#0x300]
.text:00000000000FEA40 C9 5A 99 92 MOV X9, #0xFFFFFFFFFFFF3529
.text:00000000000FEA44 7F 0B 00 39 STRB WZR, [X27,#(byte_211F13 - 0x211F11)]
.text:00000000000FEA48 08 01 09 8B ADD X8, X8, X9
.text:00000000000FEA4C 00 01 1F D6 BR X8
基址 X25 一直复用导致匹配模式计算跳转目的的思路有点麻烦,考虑模拟执行
大致思路:从函数头开始执行,以翻译块为单位处理,hook_code 遇到 ret 就停止,contiune 进下一轮, bl/svc/非法内存访问 pc + 1。碰到 br 就进入回溯,找 csel, add, sub ,patch 后返回分支信息和 context(用于将后续翻译块 bfs 递归处理)。如果是双分支,选 nop 块作为跳板,插入两个分支的跳转,br X8 处 patch 成跳转到这个 chunk;如果是单分支直接在原地 patch。细节见脚本,把原脚本改了挺多。ins_help 是自己写的 capstone 中间层,不是很重要 注意文件偏移和虚拟地址映射
import queue
import struct
import keystone
import unicorn
from capstone.arm64_const import *
from elftools.elf.elffile import ELFFile
from unicorn.arm64_const import *
from unicorn import *
from ins_help import *
# 1. 将函数头放到队列中
# 2. 从队列中取出一个地址,开始执行
# 3. 执行时,将执行过的指令和上下文保存到指令栈中
# 4. 判断是否到达了br reg
# 5. 如果到达了br reg,则从指令栈中取指令,判断计算目标地址(单分支、双分支),patch跳转
# 6. 将目标地址放到队列中
# 7. 遇到了ret或者是bl .__stack_chk_fail 就停止
bin_data = None
out_data = None
runtime_data = None
uc = None
ins_stack = []
block_flow = {}
is_success = False
img_size = 0
jmp_table_start = 0x144320
jmp_table_end = 0x148000
start_addrs = [
0xFD0BC,
0x100124,
#0x100198,
]
def reg_ctou(regname): #
# This function covert capstone reg name to unicorn reg const.
regname = regname.lower()
if regname == 'fp':
return UC_ARM64_REG_FP
if regname == 'lr':
return UC_ARM64_REG_LR
if regname == 'sp':
return UC_ARM64_REG_SP
if regname == 'xzr':
return UC_ARM64_REG_XZR
if regname == 'wzr':
return UC_ARM64_REG_WZR
type1 = regname[0]
if type1.lower() == 'w' or type1.lower() == 'x':
idx = int(regname[1:])
if type1.lower() == 'w':
return idx + UC_ARM64_REG_W0
else:
if idx == 29:
return 1
elif idx == 30:
return 2
else:
return idx + UC_ARM64_REG_X0
return None
def is_ref_ilegel_emm(mu, ins):
if ins.op_str.find('[') != -1:
if ins.op_str.find('[sp') == -1: # 不是通过sp访问内存
for op in ins.operands:
if op.type == ARM64_OP_MEM:
addr = 0
if op.value.mem.base != 0:
addr += mu.reg_read(reg_ctou(ins.reg_name(op.value.mem.base)))
if op.value.mem.index != 0:
index_value = mu.reg_read(reg_ctou(ins.reg_name(op.value.mem.index)))
if op.shift.value != 0:
index_value <<= op.shift.value
addr += index_value
if op.value.mem.disp != 0:
addr += op.value.mem.disp
if 0x0 <= addr <= img_size: # 访问so中的数据,允许
return False
elif 0x80000000 <= addr < 0x80000000 + 0x1000 * 0x1000 * 8: #访问栈中的数据,允许
return False
else:
return True
else:# 是通过sp的内存访问,允许
return False
else:
return False
def set_context(uc, regs):
if regs is None:
return
for i in range(29): # x0 ~ x28
idx = UC_ARM64_REG_X0 + i
uc.reg_write(idx, regs[i])
uc.reg_write(UC_ARM64_REG_FP, regs[29]) # fp
uc.reg_write(UC_ARM64_REG_LR, regs[30]) # lr
uc.reg_write(UC_ARM64_REG_SP, regs[31]) # sp
def get_context(uc):
regs = []
for i in range(29):
idx = UC_ARM64_REG_X0 + i
regs.append(uc.reg_read(idx))
regs.append(uc.reg_read(UC_ARM64_REG_FP))
regs.append(uc.reg_read(UC_ARM64_REG_LR))
regs.append(uc.reg_read(UC_ARM64_REG_SP))
return regs
def get_context_reg(context, reg):
if reg in (UC_ARM64_REG_XZR, UC_ARM64_REG_WZR):
return 0
if UC_ARM64_REG_X0 <= reg <= UC_ARM64_REG_X28:
return context[reg - UC_ARM64_REG_X0]
if UC_ARM64_REG_W0 <= reg <= UC_ARM64_REG_W30:
return context[reg - UC_ARM64_REG_W0] & 0xFFFFFFFF
if reg == UC_ARM64_REG_FP:
return context[29]
if reg == UC_ARM64_REG_LR:
return context[30]
if reg == UC_ARM64_REG_SP:
return context[31]
raise KeyError('unsupported reg const: %r' % (reg,))
def format_bytes(data):
return ' '.join(f"{b:02x}" for b in data)
def dump_ins_stack(ins_stack, limit=8):
print('[+] recent block tail:')
for addr, _, code, ins in ins_stack[-limit:]:
print(' 0x%x:\t%s\t%s\t%s' % (
addr,
format_bytes(code),
ins.mnemonic,
ins.op_str,
))
def create_unicorn():
global uc
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
uc.mem_map(0x80000000, 8 * 0x1000 * 0x1000)
uc.mem_map(0, 8 * 0x1000 * 0x1000)
uc.mem_write(0, bytes(runtime_data))
uc.reg_write(UC_ARM64_REG_SP, 0x80000000 + 0x1000 * 0x1000 * 6)
uc.hook_add(UC_HOOK_CODE, hook_code)
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_mem_access)
return uc
def apply_patch(addr, new_bytes):
global out_data
global runtime_data
new_bytes = bytes(new_bytes)
live_bytes = bytes(uc.mem_read(addr, len(new_bytes))) if uc is not None else b''
exec_bytes = bytes(runtime_data[addr: addr + len(new_bytes)])
file_bytes = bytes(out_data[addr: addr + len(new_bytes)])
orig_bytes = bytes(bin_data[addr: addr + len(new_bytes)])
if live_bytes:
print("live : %s" % format_bytes(live_bytes))
print("exec : %s" % format_bytes(exec_bytes))
print("file : %s" % format_bytes(file_bytes))
print("orig : %s" % format_bytes(orig_bytes))
if live_bytes and live_bytes != exec_bytes:
print('warning! live and exec not match at %x' % (addr))
if file_bytes != exec_bytes:
print('warning! file and exec not match at %x' % (addr))
runtime_data[addr: addr + len(new_bytes)] = new_bytes
out_data[addr: addr + len(new_bytes)] = new_bytes
print('patch code: %x\t%s => %s' % (addr, format_bytes(exec_bytes), format_bytes(new_bytes)))
def branch_reach_limit(mnemonic):
mnemonic = mnemonic.lower()
if mnemonic == 'b':
# imm26 << 2
return (-0x8000000, 0x8000000)
if mnemonic.startswith('b') and not mnemonic.startswith('bl'):
# imm19 << 2
return (-0x100000, 0x100000)
return None
def assemble_branch(src, dest, mnemonic, block_base=None):
if dest is None:
raise ValueError('branch target is None: block=0x%x src=0x%x ins=%s' % (
block_base if block_base is not None else 0,
src,
mnemonic,
))
if dest & 0x3:
raise ValueError('branch target is not 4-byte aligned: block=0x%x src=0x%x dest=0x%x ins=%s' % (
block_base if block_base is not None else 0,
src,
dest,
mnemonic,
))
if not (0 <= dest < img_size):
raise ValueError('branch target is outside image: block=0x%x src=0x%x dest=0x%x img_size=0x%x ins=%s' % (
block_base if block_base is not None else 0,
src,
dest,
img_size,
mnemonic,
))
reach = branch_reach_limit(mnemonic)
if reach is not None:
delta = dest - src
if not (reach[0] <= delta < reach[1]):
raise ValueError('branch target out of range: block=0x%x src=0x%x dest=0x%x delta=%#x ins=%s limit=[%#x, %#x)' % (
block_base if block_base is not None else 0,
src,
dest,
delta,
mnemonic,
reach[0],
reach[1],
))
asm = '%s %s' % (mnemonic, hex(dest))
ks = keystone.Ks(keystone.KS_ARCH_ARM64, keystone.KS_MODE_LITTLE_ENDIAN)
try:
return ks.asm(asm, src)[0]
except keystone.KsError as exc:
raise ValueError('keystone failed: block=0x%x src=0x%x dest=0x%x asm=`%s` error=%s' % (
block_base if block_base is not None else 0,
src,
dest,
asm,
exc,
)) from exc
def get_double_branch(uc, ins_stack):
flag_br = False
flag_sub_add = False
flag_ldr = False
flag_csel1 = False
flag_csel2 = False
br_reg = None
op_reg1 = None
op_reg2 = None
reg2_value1 = None
reg2_value2 = None
op_reg3 = None
reg3_value1= None
reg3_value2 = None
ldr_index_scale = 1
table_base = None
cond = ''
for addr, context, _, ins in ins_stack[::-1]:
# BR X8
if ins.mnemonic.lower() == 'br' and flag_br == False:
flag_br = True
br_reg = ins.operands[0].reg
# SUB X8, X8, X9
if flag_br == True and (ins.mnemonic.lower() == 'add' or ins.mnemonic.lower() == 'sub') \
and ins.operands[0].reg == br_reg and flag_sub_add == False:
if ins.operands[1].type == 1 and ins.operands[2].type == 1:
op_reg1 = ins.operands[1].reg
op_reg2 = ins.operands[2].reg
flag_sub_add = True
# CSEL X9, X10, X9, EQ
if flag_sub_add == True and ins.mnemonic.lower() == 'csel' and ins.operands[0].reg == op_reg2 \
and flag_csel1 == False:
cond = ins.op_str.split(', ')[-1]
regname1 = ins.reg_name(ins.operands[1].reg)
regname2 = ins.reg_name(ins.operands[2].reg)
reg2_value1 = get_context_reg(context, reg_ctou(regname1))
reg2_value2 = get_context_reg(context, reg_ctou(regname2))
flag_csel1 = True
# LDR X8, [X25,X9]
if flag_sub_add == True and ins.mnemonic.lower() == 'ldr' and ins.operands[0].reg == op_reg1 \
and flag_ldr == False:
mem_op = None
for op in ins.operands:
if op.type == ARM64_OP_MEM:
mem_op = op
break
assert mem_op is not None, 'not find mem operand: %x\t%s\t%s' % (addr, ins.mnemonic, ins.op_str)
table_base = 0
if mem_op.mem.base != 0:
table_base += get_context_reg(context, reg_ctou(ins.reg_name(mem_op.mem.base)))
if mem_op.mem.disp != 0:
table_base += mem_op.mem.disp
if mem_op.mem.index != 0:
op_reg3 = reg_ctou(ins.reg_name(mem_op.mem.index))
if mem_op.shift.value != 0:
ldr_index_scale = 1 << mem_op.shift.value
flag_ldr = True
# CSEL X9, X10, X9, EQ
if flag_ldr == True and ins.mnemonic.lower() == 'csel' and reg_ctou(ins.reg_name(ins.operands[0].reg)) == op_reg3 \
and flag_csel2 == False:
regname1 = ins.reg_name(ins.operands[1].reg)
regname2 = ins.reg_name(ins.operands[2].reg)
reg3_value1 = get_context_reg(context, reg_ctou(regname1))
reg3_value2 = get_context_reg(context, reg_ctou(regname2))
flag_csel2 = True
if flag_csel1 == True and flag_csel2 == True:
# 满足条件时走的分支
barr1 = uc.mem_read(table_base + reg3_value1 * ldr_index_scale, 8) #直接从文件中读数据,注意内存偏移和文件偏移的转换
base1 = struct.unpack('q',barr1)
offset1 = base1[0] - reg2_value1
# 不满足条件时走的分支
barr2 = uc.mem_read(table_base + reg3_value2 * ldr_index_scale, 8)
base2 = struct.unpack('q',barr2)
offset2 = base2[0] - reg2_value2
return (offset1, offset2, get_context(uc), cond)
else:
return None
def get_single_branch(uc, ins_stack):
last_addr, context, _, ins = ins_stack[-1]
if ins.mnemonic.lower() == 'br':
return (context[reg_ctou(ins.reg_name(ins.operands[0].reg)) - arm64_const.UC_ARM64_REG_X0], get_context(uc))
else:
return None
def find2nop(uc):
global jmp_table_start
global jmp_table_end
global out_data
help = InsHelp()
for addr in range(jmp_table_start, jmp_table_end, 8):
barr = out_data[addr: addr+8]
ins_list = list(help.disasm(barr, addr, False))
if ins_list[0].mnemonic.lower() == 'nop' and ins_list[1].mnemonic.lower() == 'nop':
return addr
return None
def patch_single_branch(src, dest, block_base=None):
jmp_bin = assemble_branch(src, dest, 'b', block_base)
apply_patch(src, jmp_bin)
def patch_double_branch(uc, addr, branch, block_base=None):
nop_addr = find2nop(uc)
assert nop_addr is not None, 'no find 2 nop'
offset1 = branch[0]
offset2 = branch[1]
cond = branch[3]
# 1. 把bx reg修改成跳转到nop_addr
jmp1_bin = assemble_branch(addr, nop_addr, 'b', block_base)
# 2. bcond addr1
jmp2_bin = assemble_branch(nop_addr, offset1, 'b' + cond, block_base)
#3. b addr2
jmp3_bin = assemble_branch(nop_addr + 4, offset2, 'b', block_base)
apply_patch(addr, jmp1_bin)
apply_patch(nop_addr, jmp2_bin)
apply_patch(nop_addr + 4, jmp3_bin)
def hook_code(uc, address, size, user_data):
global ins_stack
global is_success
if is_success == True:
uc.emu_stop()
return
ins_help = InsHelp()
code = uc.mem_read(address, size)
ins = list(ins_help.disasm(code, address, False))[0]
print("[+] tracing instruction\t0x%x:\t%s\t%s\t%s" % (
ins.address,
' '.join(f"{b:02x}" for b in ins.bytes),
ins.mnemonic,
ins.op_str))
#记录指令和上下文环境
ins_stack.append((address, get_context(uc), bytes(code), ins))
#遇到ret直接停止
if ins.mnemonic.lower() == 'ret':
#uc.reg_write(UC_ARM64_REG_PC, 0)
print("[+] encountered ret, stop")
ins_stack.clear()
uc.emu_stop()
return
#遇到bl .__stack_chk_fail停止
if ins.mnemonic.lower() == 'bl' and ins.operands[0].imm == 0x237C0:
#uc.reg_write(UC_ARM64_REG_PC, 0)
print("[+] encountered bl .__stack_chk_fail, stop")
ins_stack.clear()
uc.emu_stop()
return
#跳过bl、非栈、so本身内存访问、svc
if ins.mnemonic.lower().startswith('bl') or is_ref_ilegel_emm(uc, ins) or ins.mnemonic.lower().startswith('svc'):
print("[+] pass instruction 0x%x\t%s\t%s" % (ins.address, ins.mnemonic, ins.op_str))
uc.reg_write(UC_ARM64_REG_PC, address + size)
return
if ins.mnemonic == "br":
#判断是否到达间接跳转
is_success = True
block_base = ins_stack[0][0]
jmp_addr = ins_stack[-1][0]
ret = get_double_branch(uc, ins_stack)
if ret != None:
print('find double branch: %x => %x, %x' % (block_base, ret[0], ret[1]))
try:
patch_double_branch(uc, jmp_addr, ret, block_base)
except ValueError as exc:
print("[+] patch double branch failed for block 0x%x at br 0x%x: %s" % (
block_base,
jmp_addr,
exc,
))
dump_ins_stack(ins_stack)
is_success = False
else:
block_flow[block_base] = ret
else:
ret = get_single_branch(uc, ins_stack)
if ret == None:
print("[+] find dest failed 0x%x\t%s\t%s" % (ins.address, ins.mnemonic, ins.op_str))
print("[+] unresolved block base: 0x%x" % (block_base))
dump_ins_stack(ins_stack)
is_success = False
else:
print('find single branch: %x => %x' % (block_base, ret[0]))
try:
patch_single_branch(jmp_addr, ret[0], block_base)
except ValueError as exc:
print("[+] patch single branch failed for block 0x%x at br 0x%x: %s" % (
block_base,
jmp_addr,
exc,
))
dump_ins_stack(ins_stack)
is_success = False
else:
block_flow[block_base] = ret
ins_stack.clear()
uc.emu_stop()
return
def hook_mem_access(uc, type, address, size, value, userdata):
pc = uc.reg_read(UC_ARM64_REG_PC)
print('error! access invalid mem, pc:%x type:%d addr:%x size:%x' % (pc, type, address, size))
uc.emu_stop()
return False
def load_elf(filename):
global img_size
global out_data
segs = []
with open(filename, 'rb') as f:
out_data = bytearray(f.read())
for seg in ELFFile(f).iter_segments('PT_LOAD'):
print('file_off:%s, va: %s, size: %s' %(hex(seg['p_offset']), hex(seg['p_vaddr']), hex(seg['p_filesz'])))
segs.append((seg['p_offset'],seg['p_vaddr'], seg['p_filesz'], seg.data()))
img_size = segs[-1][1] + segs[-1][2]
byte_arr = bytearray([0] * img_size)
for seg in segs:
vaddr = seg[1]
size = seg[2]
data = seg[3]
byte_arr[vaddr: vaddr + size] = bytearray(data)
return byte_arr
# with open('out.bin', 'wb') as f:
# f.write(bytearray(byte_arr))
def init_unicorn(file_name):
global bin_data
global runtime_data
global uc
#装载一下so到内存
bin_data = bytes(load_elf(file_name))
runtime_data = bytearray(bin_data)
uc = None
def run(addr, context):
global uc
global ins_stack
global is_success
global block_flow
#开始模拟执行,函数返回说明在hook_code中执行了emu_stop
ins_stack.clear()
create_unicorn()
set_context(uc, context)
uc.emu_start(addr, 0x10000)
if is_success == True:
is_success = False
return block_flow[addr] #返回分支信息和context
def deobf():
# 初始化unicorn
filename = ''
patched_filename = 'out.so'
init_unicorn(filename)
q = queue.Queue()
for start_addr in start_addrs:
q.put((start_addr, None)) # 入口函数是第一个节点,放到队列中去,队列中是(地址,上下文)
traced = set() # 跑过的节点
pending = set(start_addrs)
while not q.empty(): #一直循环,直到队列为空
addr, context = q.get()
pending.discard(addr)
if addr in traced:
continue
traced.add(addr) # 跑过了
s = run(addr, context) #开始模拟执行,找br reg
if s is None:
continue
if len(s) == 2: #单分支
if s[0] not in traced and s[0] not in pending:
q.put(s) #将分支节点放到队列中
pending.add(s[0])
else: #双分支
if s[0] not in traced and s[0] not in pending:
q.put((s[0], s[2]))#将分支节点放到队列中
pending.add(s[0])
if s[1] not in traced and s[1] not in pending:
q.put((s[1], s[2]))#将分支节点放到队列中
pending.add(s[1])
#打印代码流
for addr in block_flow:
if len(block_flow[addr]) == 4:
print('%s => %s, %s, %s' % (hex(addr), hex(block_flow[addr][0]), hex(block_flow[addr][1]), block_flow[addr][3]))
else:
print('%s => %s' % (hex(addr), hex(block_flow[addr][0])))
#保存patch后的so
with open(patched_filename, 'wb') as f:
f.write(out_data)
if __name__ == '__main__':
deobf()