@source: https://bbs.kanxue.com/thread-280231-1.htm

Unicorn#

基于 qemu,支持多架构 CPU

   Python/C 脚本
    Unicorn API
    QEMU TCG (Tiny Code Generator)  ← 核心翻译引擎
    翻译为宿主机原生指令执行

核心流程#

  # unicorn_skeleton.py
  import unicorn
  from unicorn.arm64_const import *

  # Step 1: 创建引擎 —— 指定架构和模式
  uc = unicorn.Uc(unicorn.UC_ARCH_ARM64, unicorn.UC_MODE_ARM)

  # Step 2: 映射内存 —— 划定沙箱边界
  uc.mem_map(0x1000, 0x4000)   # 从 0x1000 开始,映射 16KB

  # Step 3: 写入数据 —— 放入代码和初始状态
  uc.mem_write(0x1000, b"\x00" * 0x100)
  uc.reg_write(UC_ARM64_REG_SP, 0x4000)

  # Step 4: 挂钩子 —— 安装观察窗口(可选)
  def on_insn(uc, address, size, user_data):
      print(f"PC=0x{address:x}")
  uc.hook_add(unicorn.UC_HOOK_CODE, on_insn)

  # Step 5: 启动模拟 —— 指定起止地址
  uc.emu_start(begin=0x1000, until=0x1010)

内存映射需要页对齐(4KB)

hook#

可以对特定事件插桩

  Unicorn 执行引擎
         ├── 执行每条指令前  →  UC_HOOK_CODE
         ├── 读内存时        →  UC_HOOK_MEM_READ
         ├── 写内存时        →  UC_HOOK_MEM_WRITE
         ├── 访问未映射内存  →  UC_HOOK_MEM_UNMAPPED
         ├── 执行中断/syscall →  UC_HOOK_INSN (特定指令)
         └── 遇到无效指令    →  UC_HOOK_INSN_INVALID
# trace_example.py
import unicorn, capstone
from unicorn.arm64_const import *

CODE = bytes([
    0x20, 0x00, 0x80, 0xD2,  # MOV X0, #1
    0x41, 0x00, 0x80, 0xD2,  # MOV X1, #2
    0x02, 0x00, 0x01, 0x8B,  # ADD X2, X0, X1
])
BASE = 0x1000

uc = unicorn.Uc(unicorn.UC_ARCH_ARM64, unicorn.UC_MODE_ARM)
uc.mem_map(BASE, 0x1000)
uc.mem_write(BASE, CODE)

cs = capstone.Cs(capstone.CS_ARCH_AARCH64, capstone.CS_MODE_ARM)

def hook_code(uc, address, size, user_data):
    insn_bytes = uc.mem_read(address, size)
    for insn in cs.disasm(bytes(insn_bytes), address):
        print(f"  [0x{address:08x}]  {insn.mnemonic:<8} {insn.op_str}")

def hook_mem_invalid(uc, access_type, address, size, value, user_data):
    """
        access_type:
        - UC_MEM_READ_UNMAPPED
        - UC_MEM_WRITE_UNMAPPED
        - UC_MEM_FETCH_UNMAPPED 执行未映射地址
    """
    access_names = {
        unicorn.UC_MEM_READ_UNMAPPED:  "READ",
        unicorn.UC_MEM_WRITE_UNMAPPED: "WRITE",
        unicorn.UC_MEM_FETCH_UNMAPPED: "FETCH",
    }
    print(f"[!] {access_names[access_type]} @ 0x{address:x} (size={size})")
    
    if access_type == unicorn.UC_MEM_FETCH_UNMAPPED:
        return False

    # 读写失败重试
    page_addr = address & ~0xFFF
    uc.mem_map(page_addr, 0x1000)
    return True

uc.hook_add(unicorn.UC_HOOK_CODE, hook_code)
uc.hook_add(unicorn.UC_HOOK_MEM_UNMAPPED, hook_mem_invalid)
uc.emu_start(BASE, BASE + len(CODE))

hook 地址范围过滤

# 只监控 0x1000~0x2000 之间的代码执行
  uc.hook_add(unicorn.UC_HOOK_CODE, hook_code, begin=0x1000, end=0x2000)

  # 只监控 0x5000 的内存写入
  uc.hook_add(unicorn.UC_HOOK_MEM_WRITE, hook_write, begin=0x5000,
  end=0x5004)

TCG 执行原理#

   ARM64 字节码
  ┌─────────────────────────────────┐
  │         QEMU TCG 前端            │
  │  ARM64 解码器 → TCG IR 中间表示   │
  │                                 │
  │  ADD X2, X0, X1                 │
  │    ↓ 翻译为 TCG IR              │
  │  tcg_gen_add_i64(X2, X0, X1)   │
  └─────────────────────────────────┘
  ┌─────────────────────────────────┐
  │         QEMU TCG 后端           │
  │  把 TCG IR 编译为宿主机指令      │
  │  (Mac 是 ARM64,直接接近        │
  │   原生;x86 Mac 需要翻译)       │
  └─────────────────────────────────┘
     翻译块 (Translation Block, TB)
     缓存起来,下次同样代码直接复用
      CPU 执行

翻译块 (TB)#

一段连续直线执行的代码,遇到跳转就结束

    0x1000: MOV X0, #1    ┐
    0x1004: MOV X1, #2    ├── TB1(直线,无跳转)
    0x1008: ADD X2, X0, X1┘
    0x100C: CBZ X2, 0x2000    ← 条件跳转,TB1 结束

    0x1010: MOV X3, #9    ┐
    0x1014: RET           ┘── TB2

    0x2000: MOV X3, #0    ┐
    0x2004: RET           ┘── TB3
  1. Unicorn 在翻译每个 TB 时为每条指令注入了回调点(UC_HOOK_CODE),因此 hook 的性能开销在复杂循环中很显著,善用地址范围过滤精准插桩
  2. 第一次执行需要翻译,比后续慢,之后命中 TB Cache 直接执行
  3. 修改内存后要调用 emu_stop() 再重启:旧的 TB 可能已经缓存,强制清除缓存后重新翻译

反调试对抗#

反调试手段 检测方式 Unicorn 对策
ptrace self syscall 返回值 hook syscall 伪造返回值
时间检测 两次 gettimeofday 比较 hook syscall,返回固定值
smc 运行时解密 UC_HOOK_MEM_WRITE 捕获解密时机
# syscall_smc_hook.py
import unicorn
from unicorn.x86_const import *

FAKE_TIME = 1700000000 # 伪造时间戳
# SMC
CODE_START = 0x1000
CODE_END = 0x5000
decrypted_regions = []

uc = unicorn.Uc(unicorn.UC_ARCH_X86, unicorn.UC_MODE_64)

def hook_syscall(uc, user_data):
    rax = uc.reg_read(UC_X86_REG_RAX)

    if rax == 96: # gettimeofday (Linux x86_64)
        timeval_ptr = uc.reg_read(UC_X86_REG_RDI)

        import struct
        uc.mem_write(timeval_ptr, struct.pack("<QQ", FAKE_TIME, 0))
        uc.reg_write(UC_X86_REG_RAX, 0) # 返回 0 = 成功

    elif rax == 101: # ptrace
        uc.reg_write(UC_X86_REG_RAX, 0xFFFFFFFFFFFFFFFF) # 返回 -1

    elif rax == 60: # exit
        uc.emu_stop()

def hook_code_write(uc, access, address, size, value, user_data):
    if CODE_START <= address < CODE_END:
        data = bytes(uc.mem_read(address, size))
        print(f"[SMC] 代码段 0x{address:x} 被写入: {data.hex()}")
        decrypted_regions.append((address, size))



uc.hook_add(unicorn.UC_HOOK_INSN, hook_syscall, unicorn.x86_const.UC_X86_INS_SYSCALL)
uc.hook_add(unicorn.UC_HOOK_MEM_WRITE, hook_code_write, begin=CODE_START, end=CODE_END)

小练习#

import unicorn
from unicorn.arm64_const import *
from keystone import *

code = """
loop:
    LDRB W3, [X0, X1]     
    EOR  W3, W3, W2       
    STRB W3, [X0, X1]     
    ADD  X1, X1, #1       
    CMP  X1, #8
    B.LT loop             
    """
try:
    ks = Ks(KS_ARCH_ARM64, keystone.KS_MODE_LITTLE_ENDIAN)
    encoding, count = ks.asm(code)
    CODE = bytes(list(encoding))
except keystone.KsError as e:
    print(f"ERROR: {e}")
    CODE = bytes([
        0x03, 0x68, 0x61, 0x38, 0x63, 0x00, 0x02, 0x4a, 0x03, 0x68, 0x21, 0x38, 0x21, 0x04, 0x00, 0x91, 0x3f, 0x20, 0x00, 0xf1, 0x6b, 0xff, 0xff, 0x54
    ])

CODE_BASE = 0
BUF_BASE = 0x1000
STACK = 0x4000
buf_data = b"HELLO!!!"

uc = unicorn.Uc(unicorn.UC_ARCH_ARM64, unicorn.UC_MODE_ARM)
uc.mem_map(CODE_BASE, 0x1000)
uc.mem_map(BUF_BASE, 0x1000)
uc.mem_map(STACK, 0x1000)

uc.mem_write(CODE_BASE, CODE)
uc.mem_write(BUF_BASE, buf_data)

uc.reg_write(UC_ARM64_REG_X0, BUF_BASE)
uc.reg_write(UC_ARM64_REG_X1, 0x00)
uc.reg_write(UC_ARM64_REG_X2, 0x55)

def hook_code(uc, access, address, size, value, user_data):
    if BUF_BASE <= address < BUF_BASE + 8:
        idx = address - BUF_BASE
    print(f"    buf[{idx}] = 0x{value & 0xFF:02x} ({chr(value & 0xFF) if 32 <= (value & 0xFF) < 127 else '.'})")

uc.hook_add(unicorn.UC_HOOK_MEM_WRITE, hook_code)
uc.emu_start(CODE_BASE, CODE_BASE + len(CODE))

result = bytes(uc.mem_read(BUF_BASE, 8))
print(f"\nresult: {result.hex()} = {result}")

了解一下 ADRP (Form PC-Raelative Address to Page):

$$ X25 = (\text{PC} \& \sim \text{0xFFF}) + (\text{immediate} \ll 12) $$

此处 #off_212C70@PAGE (immediate) 即目标数据与当前指令页边界的页偏移

  1. 将当前指令所在地址(PC)的低 12 位清零,即对齐到 4KB 页边界
  2. 把指令中给出的立即数左移 12 位,结果相加放入目标寄存器

google 会说 ADRP 常与 ADD/LDR 搭配使用

利用页表的特性来设计位位置无关代码:ADRP 跨大步找到目标数据所在的 4KB 页面,再用 ADD/LDR 跨小步,加上页内偏移定位到字节

闲得没事算一下,ai 一直算不对

机器码 79 08 00 F0,大端 F0 00 08 79 = 11110000000000000000100001111001

ADRP 标准编码格式:

位范围 31 30:29 28:24 23:5 4:0
含义 op immlo Fixed immhi Rd
ADRP 特征值 1 (立即数低位) 10000 (立即数高位) (目标寄存器)

immlo = 11 = 3

immhi = 0000000000001000011 = 67

pc = 0x1031a0

>>> immlo = 3
>>> immhi = 67
>>> imm = (immhi << 2) | immlo
>>> imm
271
>>> hex((pc & ~0xFFF) + (imm << 12))
'0x212000'

IDA 中的符号 #off_212C70@PAGE 表示 212C70 的高位,即页边界,#off_212C70@PAGEOFF 表示低位,即页内偏移,原来 IDA 里面的 ADRP X25, #off_212C70@PAGE 直接当成赋值语句就好了

混淆模式#

有如下的混淆

CMP             X19, #0
MOV             W9, #0x40 ; '@'         ; X9 = 0x40
MOV             W10, #0x38 ; '8'        ; X10 = 0x38
ADRP            X25, #off_212C70@PAGE
CSEL            X9, X10, X9, EQ         ; if X19 == 0 then X9 = X10
ADD             X25, X25, #off_212C70@PAGEOFF ; X25 = 0x212C70
LDR             X8, [X25,X9]            ; X8 = qword[X25 + X9]
MOV             W9, #0xFE53             ; X9 = 0xFE53
MOV             W10, #0x82B4            ; X10 = 0x82B4
CSEL            X9, X10, X9, EQ         ; if X19 == 0 then X9 = X10
SUB             X8, X8, X9              ; X8 = X8 - X9
BR              X8                      ; 跳转到X8

0x212C70 是一个函数表,在代码里面就是 X25。X9 是函数表内的偏移,根据 X19 的值决定偏移的值,共两种情况

但是为什么偏移算完之后那么大,完全不在表里面了,有点反直觉

这样的函数反编译之后大概长这样,修复后是一个巨大的函数

void __fastcall sub_103168(__int64 a1, __int64 a2)
{
  __int64 v2; // x9
  __int64 v3; // x8
  __int64 v4; // x9

  _ReadStatusReg(TPIDR_EL0);
  v2 = 64;
  if ( !a2 )
    v2 = 56;
  v3 = *(__int64 *)((char *)&off_212C70 + v2);
  v4 = 65107;
  if ( !a2 )
    v4 = 33460;
  __asm { BR              X8 }
}

下面还有大量这样类似模式的连续的间接跳转。因为 X25 复用所以有点不一样

__int64 sub_103258()
{
  __int64 v0; // x25
  __int64 v1; // x26
  __int64 v2; // x29
  __int64 v3; // x0
  __int64 v4; // x8
  __int64 v5; // x8
  __int64 v6; // x9

  v3 = sub_1500BC(v2 - 96);
  if ( (v1 | 0x2B355CD000000000uLL) >= 0x3CCCE95800000000LL )
    v4 = 48;
  else
    v4 = 0;
  v5 = *(_QWORD *)(v0 + v4);
  v6 = 49525;
  if ( (v1 | 0x2B355CD000000000uLL) < 0x3CCCE95800000000LL )
    v6 = 216;
  return ((__int64 (__fastcall *)(__int64))(v5 - v6))(v3);
}

以及存在这样的单分支跳转

.text:00000000000FEA1C 9B 08 00 F0 7B 47 3C 91                       ADRL            X27, unk_211F11
.text:00000000000FEA24 81 08 00 F0 21 B0 3E 91                       ADRL            X1, unk_211FAC
.text:00000000000FEA2C 22 01 80 52                                   MOV             W2, #9
.text:00000000000FEA30 E3 03 1F 32                                   MOV             W3, #2
.text:00000000000FEA34 E0 03 1B AA                                   MOV             X0, X27
.text:00000000000FEA38 A9 9C FC 97                                   BL              sub_25CDC
.text:00000000000FEA3C 88 82 41 F9                                   LDR             X8, [X20,#0x300]
.text:00000000000FEA40 C9 5A 99 92                                   MOV             X9, #0xFFFFFFFFFFFF3529
.text:00000000000FEA44 7F 0B 00 39                                   STRB            WZR, [X27,#(byte_211F13 - 0x211F11)]
.text:00000000000FEA48 08 01 09 8B                                   ADD             X8, X8, X9
.text:00000000000FEA4C 00 01 1F D6                                   BR              X8

基址 X25 一直复用导致匹配模式计算跳转目的的思路有点麻烦,考虑模拟执行

大致思路:从函数头开始执行,以翻译块为单位处理,hook_code 遇到 ret 就停止,contiune 进下一轮, bl/svc/非法内存访问 pc + 1。碰到 br 就进入回溯,找 csel, add, sub ,patch 后返回分支信息和 context(用于将后续翻译块 bfs 递归处理)。如果是双分支,选 nop 块作为跳板,插入两个分支的跳转,br X8 处 patch 成跳转到这个 chunk;如果是单分支直接在原地 patch。细节见脚本,把原脚本改了挺多。ins_help 是自己写的 capstone 中间层,不是很重要 注意文件偏移和虚拟地址映射

import queue
import struct

import keystone
import unicorn
from capstone.arm64_const import *
from elftools.elf.elffile import ELFFile
from unicorn.arm64_const import *
from unicorn import *
from ins_help import *

# 1. 将函数头放到队列中
# 2. 从队列中取出一个地址,开始执行
# 3. 执行时,将执行过的指令和上下文保存到指令栈中
# 4. 判断是否到达了br reg
# 5. 如果到达了br reg,则从指令栈中取指令,判断计算目标地址(单分支、双分支),patch跳转
# 6. 将目标地址放到队列中
# 7. 遇到了ret或者是bl .__stack_chk_fail 就停止

bin_data = None
out_data = None
runtime_data = None
uc = None
ins_stack = []
block_flow = {}
is_success = False
img_size = 0
jmp_table_start = 0x144320
jmp_table_end = 0x148000
start_addrs = [
    0xFD0BC,
    0x100124,
    #0x100198,
]

def reg_ctou(regname):  #
    # This function covert capstone reg name to unicorn reg const.
    regname = regname.lower()
    if regname == 'fp':
        return UC_ARM64_REG_FP
    if regname == 'lr':
        return UC_ARM64_REG_LR
    if regname == 'sp':
        return UC_ARM64_REG_SP
    if regname == 'xzr':
        return UC_ARM64_REG_XZR
    if regname == 'wzr':
        return UC_ARM64_REG_WZR

    type1 = regname[0]
    if type1.lower() == 'w' or type1.lower() == 'x':
        idx = int(regname[1:])
        if type1.lower() == 'w':
            return idx + UC_ARM64_REG_W0
        else:
            if idx == 29:
                return 1
            elif idx == 30:
                return 2
            else:
                return idx + UC_ARM64_REG_X0
    return None

def is_ref_ilegel_emm(mu, ins):
    if ins.op_str.find('[') != -1:
        if ins.op_str.find('[sp') == -1:  # 不是通过sp访问内存
            for op in ins.operands:
                if op.type == ARM64_OP_MEM:
                    addr = 0
                    if op.value.mem.base != 0:
                        addr += mu.reg_read(reg_ctou(ins.reg_name(op.value.mem.base)))
                    if op.value.mem.index != 0:
                        index_value = mu.reg_read(reg_ctou(ins.reg_name(op.value.mem.index)))
                        if op.shift.value != 0:
                            index_value <<= op.shift.value
                        addr += index_value
                    if op.value.mem.disp != 0:
                        addr += op.value.mem.disp
                    if 0x0 <= addr <= img_size: # 访问so中的数据,允许
                        return False
                    elif 0x80000000 <= addr < 0x80000000 + 0x1000 * 0x1000 * 8: #访问栈中的数据,允许
                        return False
                    else:
                        return True
        else:# 是通过sp的内存访问,允许
            return False
    else:
        return False

def set_context(uc, regs):
    if regs is None:
        return

    for i in range(29):  # x0 ~ x28
        idx = UC_ARM64_REG_X0 + i
        uc.reg_write(idx, regs[i])
    uc.reg_write(UC_ARM64_REG_FP, regs[29])  # fp
    uc.reg_write(UC_ARM64_REG_LR, regs[30])  # lr
    uc.reg_write(UC_ARM64_REG_SP, regs[31])  # sp

def get_context(uc):
    regs = []
    for i in range(29):
        idx = UC_ARM64_REG_X0 + i
        regs.append(uc.reg_read(idx))
    regs.append(uc.reg_read(UC_ARM64_REG_FP))
    regs.append(uc.reg_read(UC_ARM64_REG_LR))
    regs.append(uc.reg_read(UC_ARM64_REG_SP))
    return regs

def get_context_reg(context, reg):
    if reg in (UC_ARM64_REG_XZR, UC_ARM64_REG_WZR):
        return 0
    if UC_ARM64_REG_X0 <= reg <= UC_ARM64_REG_X28:
        return context[reg - UC_ARM64_REG_X0]
    if UC_ARM64_REG_W0 <= reg <= UC_ARM64_REG_W30:
        return context[reg - UC_ARM64_REG_W0] & 0xFFFFFFFF
    if reg == UC_ARM64_REG_FP:
        return context[29]
    if reg == UC_ARM64_REG_LR:
        return context[30]
    if reg == UC_ARM64_REG_SP:
        return context[31]
    raise KeyError('unsupported reg const: %r' % (reg,))

def format_bytes(data):
    return ' '.join(f"{b:02x}" for b in data)

def dump_ins_stack(ins_stack, limit=8):
    print('[+] recent block tail:')
    for addr, _, code, ins in ins_stack[-limit:]:
        print('    0x%x:\t%s\t%s\t%s' % (
            addr,
            format_bytes(code),
            ins.mnemonic,
            ins.op_str,
        ))

def create_unicorn():
    global uc
    uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
    uc.mem_map(0x80000000, 8 * 0x1000 * 0x1000)
    uc.mem_map(0, 8 * 0x1000 * 0x1000)
    uc.mem_write(0, bytes(runtime_data))
    uc.reg_write(UC_ARM64_REG_SP, 0x80000000 + 0x1000 * 0x1000 * 6)
    uc.hook_add(UC_HOOK_CODE, hook_code)
    uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_mem_access)
    return uc

def apply_patch(addr, new_bytes):
    global out_data
    global runtime_data

    new_bytes = bytes(new_bytes)
    live_bytes = bytes(uc.mem_read(addr, len(new_bytes))) if uc is not None else b''
    exec_bytes = bytes(runtime_data[addr: addr + len(new_bytes)])
    file_bytes = bytes(out_data[addr: addr + len(new_bytes)])
    orig_bytes = bytes(bin_data[addr: addr + len(new_bytes)])

    if live_bytes:
        print("live : %s" % format_bytes(live_bytes))
    print("exec : %s" % format_bytes(exec_bytes))
    print("file : %s" % format_bytes(file_bytes))
    print("orig : %s" % format_bytes(orig_bytes))

    if live_bytes and live_bytes != exec_bytes:
        print('warning! live and exec not match at %x' % (addr))
    if file_bytes != exec_bytes:
        print('warning! file and exec not match at %x' % (addr))

    runtime_data[addr: addr + len(new_bytes)] = new_bytes
    out_data[addr: addr + len(new_bytes)] = new_bytes
    print('patch code: %x\t%s => %s' % (addr, format_bytes(exec_bytes), format_bytes(new_bytes)))

def branch_reach_limit(mnemonic):
    mnemonic = mnemonic.lower()
    if mnemonic == 'b':
        # imm26 << 2
        return (-0x8000000, 0x8000000)
    if mnemonic.startswith('b') and not mnemonic.startswith('bl'):
        # imm19 << 2
        return (-0x100000, 0x100000)
    return None

def assemble_branch(src, dest, mnemonic, block_base=None):
    if dest is None:
        raise ValueError('branch target is None: block=0x%x src=0x%x ins=%s' % (
            block_base if block_base is not None else 0,
            src,
            mnemonic,
        ))
    if dest & 0x3:
        raise ValueError('branch target is not 4-byte aligned: block=0x%x src=0x%x dest=0x%x ins=%s' % (
            block_base if block_base is not None else 0,
            src,
            dest,
            mnemonic,
        ))
    if not (0 <= dest < img_size):
        raise ValueError('branch target is outside image: block=0x%x src=0x%x dest=0x%x img_size=0x%x ins=%s' % (
            block_base if block_base is not None else 0,
            src,
            dest,
            img_size,
            mnemonic,
        ))

    reach = branch_reach_limit(mnemonic)
    if reach is not None:
        delta = dest - src
        if not (reach[0] <= delta < reach[1]):
            raise ValueError('branch target out of range: block=0x%x src=0x%x dest=0x%x delta=%#x ins=%s limit=[%#x, %#x)' % (
                block_base if block_base is not None else 0,
                src,
                dest,
                delta,
                mnemonic,
                reach[0],
                reach[1],
            ))

    asm = '%s %s' % (mnemonic, hex(dest))
    ks = keystone.Ks(keystone.KS_ARCH_ARM64, keystone.KS_MODE_LITTLE_ENDIAN)
    try:
        return ks.asm(asm, src)[0]
    except keystone.KsError as exc:
        raise ValueError('keystone failed: block=0x%x src=0x%x dest=0x%x asm=`%s` error=%s' % (
            block_base if block_base is not None else 0,
            src,
            dest,
            asm,
            exc,
        )) from exc

def get_double_branch(uc, ins_stack):
    flag_br = False
    flag_sub_add = False
    flag_ldr = False
    flag_csel1 = False
    flag_csel2 = False
    br_reg = None
    op_reg1 = None
    op_reg2 = None
    reg2_value1 = None
    reg2_value2 = None
    op_reg3 = None
    reg3_value1= None
    reg3_value2 = None
    ldr_index_scale = 1
    table_base = None
    cond = ''

    for addr, context, _, ins in ins_stack[::-1]:

        # BR              X8
        if ins.mnemonic.lower() == 'br' and flag_br == False:
            flag_br = True
            br_reg = ins.operands[0].reg

        # SUB             X8, X8, X9
        if  flag_br == True and (ins.mnemonic.lower() == 'add' or ins.mnemonic.lower() == 'sub') \
                and ins.operands[0].reg == br_reg and flag_sub_add == False:
            if ins.operands[1].type == 1 and ins.operands[2].type == 1:
                op_reg1 = ins.operands[1].reg
                op_reg2 = ins.operands[2].reg
                flag_sub_add = True

        # CSEL            X9, X10, X9, EQ
        if flag_sub_add == True and ins.mnemonic.lower() == 'csel' and ins.operands[0].reg == op_reg2 \
                and flag_csel1 == False:
            cond = ins.op_str.split(', ')[-1]
            regname1 = ins.reg_name(ins.operands[1].reg)
            regname2 = ins.reg_name(ins.operands[2].reg)
            reg2_value1 = get_context_reg(context, reg_ctou(regname1))
            reg2_value2 = get_context_reg(context, reg_ctou(regname2))
            flag_csel1 = True

        #  LDR             X8, [X25,X9]
        if flag_sub_add == True and ins.mnemonic.lower() == 'ldr' and ins.operands[0].reg == op_reg1 \
                and flag_ldr == False:
            mem_op = None
            for op in ins.operands:
                if op.type == ARM64_OP_MEM:
                    mem_op = op
                    break
            assert mem_op is not None, 'not find mem operand: %x\t%s\t%s' % (addr, ins.mnemonic, ins.op_str)
            table_base = 0
            if mem_op.mem.base != 0:
                table_base += get_context_reg(context, reg_ctou(ins.reg_name(mem_op.mem.base)))
            if mem_op.mem.disp != 0:
                table_base += mem_op.mem.disp
            if mem_op.mem.index != 0:
                op_reg3 = reg_ctou(ins.reg_name(mem_op.mem.index))
            if mem_op.shift.value != 0:
                ldr_index_scale = 1 << mem_op.shift.value
            flag_ldr = True

        #  CSEL            X9, X10, X9, EQ
        if flag_ldr == True and ins.mnemonic.lower() == 'csel' and reg_ctou(ins.reg_name(ins.operands[0].reg)) == op_reg3 \
                and flag_csel2 == False:
            regname1 = ins.reg_name(ins.operands[1].reg)
            regname2 = ins.reg_name(ins.operands[2].reg)
            reg3_value1 = get_context_reg(context, reg_ctou(regname1))
            reg3_value2 = get_context_reg(context, reg_ctou(regname2))
            flag_csel2 = True

    if flag_csel1 == True and flag_csel2 == True:
        # 满足条件时走的分支
        barr1 = uc.mem_read(table_base + reg3_value1 * ldr_index_scale, 8) #直接从文件中读数据,注意内存偏移和文件偏移的转换


        base1 = struct.unpack('q',barr1)
        offset1 = base1[0] - reg2_value1

        # 不满足条件时走的分支
        barr2 = uc.mem_read(table_base + reg3_value2 * ldr_index_scale, 8)
        base2 = struct.unpack('q',barr2)
        offset2 = base2[0] - reg2_value2
        return (offset1, offset2, get_context(uc), cond)
    else:
        return None

def get_single_branch(uc, ins_stack):
    last_addr, context, _, ins = ins_stack[-1]
    if ins.mnemonic.lower() == 'br':
        return (context[reg_ctou(ins.reg_name(ins.operands[0].reg)) - arm64_const.UC_ARM64_REG_X0], get_context(uc))
    else:
        return None

def find2nop(uc):
    global jmp_table_start
    global jmp_table_end
    global out_data

    help = InsHelp()
    for addr in range(jmp_table_start, jmp_table_end, 8):
        barr = out_data[addr: addr+8]
        ins_list = list(help.disasm(barr, addr, False))
        if ins_list[0].mnemonic.lower() == 'nop' and ins_list[1].mnemonic.lower() == 'nop':
            return addr
    return None

def patch_single_branch(src, dest, block_base=None):
    jmp_bin = assemble_branch(src, dest, 'b', block_base)
    apply_patch(src, jmp_bin)
def patch_double_branch(uc, addr, branch, block_base=None):
    nop_addr = find2nop(uc)
    assert nop_addr is not None, 'no find 2 nop'

    offset1 = branch[0]
    offset2 = branch[1]
    cond = branch[3]

    # 1. 把bx reg修改成跳转到nop_addr
    jmp1_bin = assemble_branch(addr, nop_addr, 'b', block_base)

    # 2. bcond addr1
    jmp2_bin = assemble_branch(nop_addr, offset1, 'b' + cond, block_base)

    #3. b addr2
    jmp3_bin = assemble_branch(nop_addr + 4, offset2, 'b', block_base)
    apply_patch(addr, jmp1_bin)
    apply_patch(nop_addr, jmp2_bin)
    apply_patch(nop_addr + 4, jmp3_bin)
def hook_code(uc, address, size, user_data):
    global ins_stack
    global is_success

    if is_success == True:
        uc.emu_stop()
        return

    ins_help = InsHelp()
    code = uc.mem_read(address, size)
    ins = list(ins_help.disasm(code, address, False))[0]

    print("[+] tracing instruction\t0x%x:\t%s\t%s\t%s" % (
        ins.address, 
    ' '.join(f"{b:02x}" for b in ins.bytes),
        ins.mnemonic, 
        ins.op_str))


    #记录指令和上下文环境
    ins_stack.append((address, get_context(uc), bytes(code), ins))

    #遇到ret直接停止
    if ins.mnemonic.lower() == 'ret':
        #uc.reg_write(UC_ARM64_REG_PC, 0)
        print("[+] encountered ret, stop")
        ins_stack.clear()
        uc.emu_stop()
        return

    #遇到bl .__stack_chk_fail停止
    if ins.mnemonic.lower() == 'bl' and ins.operands[0].imm == 0x237C0:
        #uc.reg_write(UC_ARM64_REG_PC, 0)
        print("[+] encountered bl .__stack_chk_fail, stop")
        ins_stack.clear()
        uc.emu_stop()
        return

    #跳过bl、非栈、so本身内存访问、svc
    if ins.mnemonic.lower().startswith('bl') or is_ref_ilegel_emm(uc, ins) or ins.mnemonic.lower().startswith('svc'):
        print("[+] pass instruction 0x%x\t%s\t%s" % (ins.address, ins.mnemonic, ins.op_str))
        uc.reg_write(UC_ARM64_REG_PC, address + size)
        return

    if ins.mnemonic == "br":
        #判断是否到达间接跳转
        is_success = True
        block_base = ins_stack[0][0]
        jmp_addr = ins_stack[-1][0]
        ret = get_double_branch(uc, ins_stack)
        if ret != None:
            print('find double branch: %x => %x, %x' % (block_base, ret[0], ret[1]))
            try:
                patch_double_branch(uc, jmp_addr, ret, block_base)
            except ValueError as exc:
                print("[+] patch double branch failed for block 0x%x at br 0x%x: %s" % (
                    block_base,
                    jmp_addr,
                    exc,
                ))
                dump_ins_stack(ins_stack)
                is_success = False
            else:
                block_flow[block_base] = ret
        else:
            ret = get_single_branch(uc, ins_stack)
            if ret == None:
                print("[+] find dest failed 0x%x\t%s\t%s" % (ins.address, ins.mnemonic, ins.op_str))
                print("[+] unresolved block base: 0x%x" % (block_base))
                dump_ins_stack(ins_stack)
                is_success = False
            else:
                print('find single branch: %x => %x' % (block_base, ret[0]))
                try:
                    patch_single_branch(jmp_addr, ret[0], block_base)
                except ValueError as exc:
                    print("[+] patch single branch failed for block 0x%x at br 0x%x: %s" % (
                        block_base,
                        jmp_addr,
                        exc,
                    ))
                    dump_ins_stack(ins_stack)
                    is_success = False
                else:
                    block_flow[block_base] = ret
        ins_stack.clear()
        uc.emu_stop()
        return
def hook_mem_access(uc, type, address, size, value, userdata):
    pc = uc.reg_read(UC_ARM64_REG_PC)
    print('error! access invalid mem, pc:%x type:%d addr:%x size:%x' % (pc, type, address, size))
    uc.emu_stop()
    return False

def load_elf(filename):
    global img_size
    global out_data
    segs = []
    with open(filename, 'rb') as f:
        out_data = bytearray(f.read())
        for seg in ELFFile(f).iter_segments('PT_LOAD'):
            print('file_off:%s, va: %s, size: %s' %(hex(seg['p_offset']), hex(seg['p_vaddr']), hex(seg['p_filesz'])))
            segs.append((seg['p_offset'],seg['p_vaddr'], seg['p_filesz'], seg.data()))

    img_size = segs[-1][1] + segs[-1][2]
    byte_arr = bytearray([0] * img_size)
    for seg in segs:
        vaddr = seg[1]
        size = seg[2]
        data = seg[3]
        byte_arr[vaddr: vaddr + size] = bytearray(data)

    return byte_arr

    # with open('out.bin', 'wb') as f:
    #     f.write(bytearray(byte_arr))

def init_unicorn(file_name):
    global bin_data
    global runtime_data
    global uc

    #装载一下so到内存
    bin_data = bytes(load_elf(file_name))
    runtime_data = bytearray(bin_data)
    uc = None

def run(addr, context):
    global uc
    global ins_stack
    global is_success
    global block_flow

    #开始模拟执行,函数返回说明在hook_code中执行了emu_stop
    ins_stack.clear()
    create_unicorn()
    set_context(uc, context)
    uc.emu_start(addr, 0x10000)
    if is_success == True:
        is_success = False
        return block_flow[addr] #返回分支信息和context

def deobf():
    # 初始化unicorn
    filename = ''
    patched_filename = 'out.so'

    init_unicorn(filename)

    q = queue.Queue()
    for start_addr in start_addrs:
        q.put((start_addr, None)) # 入口函数是第一个节点,放到队列中去,队列中是(地址,上下文)
    traced = set() # 跑过的节点
    pending = set(start_addrs)
    while not q.empty(): #一直循环,直到队列为空
        addr, context = q.get()
        pending.discard(addr)
        if addr in traced:
            continue
        traced.add(addr) # 跑过了
        s = run(addr, context) #开始模拟执行,找br reg

        if s is None:
            continue

        if len(s) == 2: #单分支
            if s[0] not in traced and s[0] not in pending:
                q.put(s) #将分支节点放到队列中
                pending.add(s[0])
        else: #双分支
            if s[0] not in traced and s[0] not in pending:
                q.put((s[0], s[2]))#将分支节点放到队列中
                pending.add(s[0])
            if s[1] not in traced and s[1] not in pending:
                q.put((s[1], s[2]))#将分支节点放到队列中
                pending.add(s[1])

    #打印代码流
    for addr in block_flow:
        if len(block_flow[addr]) == 4:
            print('%s => %s, %s, %s' % (hex(addr), hex(block_flow[addr][0]), hex(block_flow[addr][1]), block_flow[addr][3]))
        else:
            print('%s => %s' % (hex(addr), hex(block_flow[addr][0])))

    #保存patch后的so
    with open(patched_filename, 'wb') as f:
        f.write(out_data)

if __name__ == '__main__':
    deobf()