How To Access The Peb Of Another Process With Python Ctypes
Solution 1:
It bears repeating that NtQueryInformationProcess
is a native system call, which is discouraged in Windows programming. Microsoft doesn't provide an import library for ntdll.dll, so the only way to call its exported functions is dynamically via GetProcAddress
. That's of course how ctypes works, so calling native NTAPI functions is no more or less difficult from Python. The problems are the lack of official support and documentation, and that NT data structures, APIs, and available information classes are all subject to change.
Note also that querying the ProcessBasicInformation
retrieves the address of the 64-bit PEB when called from a 64-bit process. Thus when querying a WOW64 32-bit process from a 64-bit process, you'll only see the native 64-bit modules ntdll.dll, wow64.dll, wow64win.dll, and wow64cpu.dll. Here's an answer that provides a technique to find the address of the 32-bit PEB by using a magic offset from the 64-bit TEB to the 32-bit TEB, which has a pointer to the 32-bit PEB. But of course this implementation detail can change at any time, breaking code that depends on it.
The following example has the ctypes definitions that are required to query and use ProcessBasicInformation
for a given process that has the same architecture (i.e. native 64-bit or WOW64 32-bit). It includes a class that demonstrates usage and provides properties for the process ID, session ID, image path, command line, and the paths for loaded modules.
The example uses a RemotePointer
subclass of ctypes._Pointer
, along with an RPOINTER
factory function. This class overrides __getitem__
to facilitate dereferencing a pointer value in the address space of another process. The index key is a tuple of the form index, handle[, size]
. The optional size
parameter (in bytes) is useful for sized strings such as NTAPI UNICODE_STRING
, e.g. ustr.Buffer[0, hProcess, usrt.Length]
. Null-terminated strings are not supported, since ReadProcessMemory
requires a sized buffer.
The logic for walking the loader data is in the private _modules_iter
method, which walks the loaded modules using the in-memory-order linked list. Note that InMemoryOrderModuleList
links to the InMemoryOrderLinks
field of the LDR_DATA_TABLE_ENTRY
structure, and so on for each link in the list. The module iterator has to adjust the base address for each entry by the offset to this field. In the C API this would use the CONTAINING_RECORD
macro.
The ProcessInformation
constructor defaults to querying the current process if no process ID or handle is provided. If the call status is an error or warning (i.e. negative NTSTATUS
), it calls NtError
to get an instance of OSError
, or WindowsError
prior to 3.3.
I have, but did not include, a more elaborate version of NtError
that calls FormatMessage
to get a formatted error message, using ntdll.dll as the source module. I can update the answer to include this version upon request.
The example was tested in Windows 7 and 10, using 32-bit and 64-bit versions of Python 2.7 and 3.5. For the remote process test, the subprocess module is used to start a 2nd Python instance. An event handle is passed to the child process for synchronization. If the parent process doesn't wait for the child process to finish loading and set the event, then the child's loader data may not be completely initialized when read.
import ctypes
from ctypes import wintypes
ntdll = ctypes.WinDLL('ntdll')
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
# WINAPI Definitions
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
ERROR_INVALID_HANDLE = 0x0006
ERROR_PARTIAL_COPY = 0x012B
PULONG = ctypes.POINTER(wintypes.ULONG)
ULONG_PTR = wintypes.LPVOID
SIZE_T = ctypes.c_size_t
def_check_bool(result, func, args):
ifnot result:
raise ctypes.WinError(ctypes.get_last_error())
return args
kernel32.ReadProcessMemory.errcheck = _check_bool
kernel32.ReadProcessMemory.argtypes = (
wintypes.HANDLE, # _In_ hProcess
wintypes.LPCVOID, # _In_ lpBaseAddress
wintypes.LPVOID, # _Out_ lpBuffer
SIZE_T, # _In_ nSize
ctypes.POINTER(SIZE_T)) # _Out_ lpNumberOfBytesRead
kernel32.CloseHandle.errcheck = _check_bool
kernel32.CloseHandle.argtypes = (wintypes.HANDLE,)
kernel32.GetCurrentProcess.restype = wintypes.HANDLE
kernel32.GetCurrentProcess.argtypes = ()
kernel32.OpenProcess.errcheck = _check_bool
kernel32.OpenProcess.restype = wintypes.HANDLE
kernel32.OpenProcess.argtypes = (
wintypes.DWORD, # _In_ dwDesiredAccess
wintypes.BOOL, # _In_ bInheritHandle
wintypes.DWORD) # _In_ dwProcessIdclassRemotePointer(ctypes._Pointer):
def__getitem__(self, key):
# TODO: slicing
size = Noneifnotisinstance(key, tuple):
raise KeyError('must be (index, handle[, size])')
iflen(key) > 2:
index, handle, size = key
else:
index, handle = key
ifisinstance(index, slice):
raise TypeError('slicing is not supported')
dtype = self._type_
offset = ctypes.sizeof(dtype) * index
address = PVOID.from_buffer(self).value + offset
simple = issubclass(dtype, ctypes._SimpleCData)
if simple and size isnotNone:
if dtype._type_ == wintypes.WCHAR._type_:
buf = (wintypes.WCHAR * (size // 2))()
else:
buf = (ctypes.c_char * size)()
else:
buf = dtype()
nread = SIZE_T()
kernel32.ReadProcessMemory(handle,
address,
ctypes.byref(buf),
ctypes.sizeof(buf),
ctypes.byref(nread))
if simple:
return buf.value
return buf
def__setitem__(self, key, value):
# TODO: kernel32.WriteProcessMemoryraise TypeError('remote pointers are read only')
@propertydefcontents(self):
# a handle is requiredraise NotImplementedError
_remote_pointer_cache = {}
defRPOINTER(dtype):
if dtype in _remote_pointer_cache:
return _remote_pointer_cache[dtype]
name = 'RP_%s' % dtype.__name__
ptype = type(name, (RemotePointer,), {'_type_': dtype})
_remote_pointer_cache[dtype] = ptype
return ptype
# NTAPI Definitions
NTSTATUS = wintypes.LONG
PVOID = wintypes.LPVOID
RPWSTR = RPOINTER(wintypes.WCHAR)
PROCESSINFOCLASS = wintypes.ULONG
ProcessBasicInformation = 0
ProcessDebugPort = 7
ProcessWow64Information = 26
ProcessImageFileName = 27
ProcessBreakOnTermination = 29
STATUS_UNSUCCESSFUL = NTSTATUS(0xC0000001)
STATUS_INFO_LENGTH_MISMATCH = NTSTATUS(0xC0000004).value
STATUS_INVALID_HANDLE = NTSTATUS(0xC0000008).value
STATUS_OBJECT_TYPE_MISMATCH = NTSTATUS(0xC0000024).value
classUNICODE_STRING(ctypes.Structure):
_fields_ = (('Length', wintypes.USHORT),
('MaximumLength', wintypes.USHORT),
('Buffer', RPWSTR))
classLIST_ENTRY(ctypes.Structure):
pass
RPLIST_ENTRY = RPOINTER(LIST_ENTRY)
LIST_ENTRY._fields_ = (('Flink', RPLIST_ENTRY),
('Blink', RPLIST_ENTRY))
classLDR_DATA_TABLE_ENTRY(ctypes.Structure):
_fields_ = (('Reserved1', PVOID * 2),
('InMemoryOrderLinks', LIST_ENTRY),
('Reserved2', PVOID * 2),
('DllBase', PVOID),
('EntryPoint', PVOID),
('Reserved3', PVOID),
('FullDllName', UNICODE_STRING),
('Reserved4', wintypes.BYTE * 8),
('Reserved5', PVOID * 3),
('CheckSum', PVOID),
('TimeDateStamp', wintypes.ULONG))
RPLDR_DATA_TABLE_ENTRY = RPOINTER(LDR_DATA_TABLE_ENTRY)
classPEB_LDR_DATA(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 8),
('Reserved2', PVOID * 3),
('InMemoryOrderModuleList', LIST_ENTRY))
RPPEB_LDR_DATA = RPOINTER(PEB_LDR_DATA)
classRTL_USER_PROCESS_PARAMETERS(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 16),
('Reserved2', PVOID * 10),
('ImagePathName', UNICODE_STRING),
('CommandLine', UNICODE_STRING))
RPRTL_USER_PROCESS_PARAMETERS = RPOINTER(RTL_USER_PROCESS_PARAMETERS)
PPS_POST_PROCESS_INIT_ROUTINE = PVOID
classPEB(ctypes.Structure):
_fields_ = (('Reserved1', wintypes.BYTE * 2),
('BeingDebugged', wintypes.BYTE),
('Reserved2', wintypes.BYTE * 1),
('Reserved3', PVOID * 2),
('Ldr', RPPEB_LDR_DATA),
('ProcessParameters', RPRTL_USER_PROCESS_PARAMETERS),
('Reserved4', wintypes.BYTE * 104),
('Reserved5', PVOID * 52),
('PostProcessInitRoutine', PPS_POST_PROCESS_INIT_ROUTINE),
('Reserved6', wintypes.BYTE * 128),
('Reserved7', PVOID * 1),
('SessionId', wintypes.ULONG))
RPPEB = RPOINTER(PEB)
classPROCESS_BASIC_INFORMATION(ctypes.Structure):
_fields_ = (('Reserved1', PVOID),
('PebBaseAddress', RPPEB),
('Reserved2', PVOID * 2),
('UniqueProcessId', ULONG_PTR),
('Reserved3', PVOID))
defNtError(status):
import sys
descr = 'NTSTATUS(%#08x) ' % (status % 2**32,)
if status & 0xC0000000 == 0xC0000000:
descr += '[Error]'elif status & 0x80000000 == 0x80000000:
descr += '[Warning]'elif status & 0x40000000 == 0x40000000:
descr += '[Information]'else:
descr += '[Success]'if sys.version_info[:2] < (3, 3):
return WindowsError(status, descr)
return OSError(None, descr, None, status)
NtQueryInformationProcess = ntdll.NtQueryInformationProcess
NtQueryInformationProcess.restype = NTSTATUS
NtQueryInformationProcess.argtypes = (
wintypes.HANDLE, # _In_ ProcessHandle
PROCESSINFOCLASS, # _In_ ProcessInformationClass
PVOID, # _Out_ ProcessInformation
wintypes.ULONG, # _In_ ProcessInformationLength
PULONG) # _Out_opt_ ReturnLengthclassProcessInformation(object):
_close_handle = False
_closed = False
_module_names = Nonedef__init__(self, process_id=None, handle=None):
if process_id isNoneand handle isNone:
handle = kernel32.GetCurrentProcess()
elif handle isNone:
handle = kernel32.OpenProcess(PROCESS_VM_READ |
PROCESS_QUERY_INFORMATION,
False, process_id)
self._close_handle = True
self._handle = handle
self._query_info()
if process_id isnotNoneand self._process_id != process_id:
raise NtError(STATUS_UNSUCCESSFUL)
def__del__(self, CloseHandle=kernel32.CloseHandle):
if self._close_handle andnot self._closed:
try:
CloseHandle(self._handle)
except WindowsError as e:
if e.winerror != ERROR_INVALID_HANDLE:
raise
self._closed = Truedef_query_info(self):
info = PROCESS_BASIC_INFORMATION()
handle = self._handle
status = NtQueryInformationProcess(handle,
ProcessBasicInformation,
ctypes.byref(info),
ctypes.sizeof(info),
None)
if status < 0:
raise NtError(status)
self._process_id = info.UniqueProcessId
self._peb = peb = info.PebBaseAddress[0, handle]
self._params = peb.ProcessParameters[0, handle]
self._ldr = peb.Ldr[0, handle]
def_modules_iter(self):
headaddr = (PVOID.from_buffer(self._peb.Ldr).value +
PEB_LDR_DATA.InMemoryOrderModuleList.offset)
offset = LDR_DATA_TABLE_ENTRY.InMemoryOrderLinks.offset
pentry = self._ldr.InMemoryOrderModuleList.Flink
while pentry:
pentry_void = PVOID.from_buffer_copy(pentry)
if pentry_void.value == headaddr:
break
pentry_void.value -= offset
pmod = RPLDR_DATA_TABLE_ENTRY.from_buffer(pentry_void)
mod = pmod[0, self._handle]
yield mod
pentry = LIST_ENTRY.from_buffer(mod, offset).Flink
defupdate_module_names(self):
names = []
for m in self._modules_iter():
ustr = m.FullDllName
name = ustr.Buffer[0, self._handle, ustr.Length]
names.append(name)
self._module_names = names
@propertydefmodule_names(self):
if self._module_names isNone:
self.update_module_names()
return self._module_names
@propertydefprocess_id(self):
return self._process_id
@propertydefsession_id(self):
return self._peb.SessionId
@propertydefimage_path(self):
ustr = self._params.ImagePathName
return ustr.Buffer[0, self._handle, ustr.Length]
@propertydefcommand_line(self):
ustr = self._params.CommandLine
buf = ustr.Buffer[0, self._handle, ustr.Length]
return buf
Example:
if __name__ == '__main__':
import os
import sys
import subprocess
import textwrap
classSECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = (('nLength', wintypes.DWORD),
('lpSecurityDescriptor', wintypes.LPVOID),
('bInheritHandle', wintypes.BOOL))
def__init__(self, *args, **kwds):
super(SECURITY_ATTRIBUTES, self).__init__(*args, **kwds)
self.nLength = ctypes.sizeof(self)
deftest_remote(use_pid=True, show_modules=False):
sa = SECURITY_ATTRIBUTES(bInheritHandle=True)
hEvent = kernel32.CreateEventW(ctypes.byref(sa), 0, 0, None)
try:
script = textwrap.dedent(r"""
import sys
import ctypes
kernel32 = ctypes.WinDLL('kernel32')
kernel32.SetEvent(%d)
sys.stdin.read()""").strip() % hEvent
cmd = '"%s" -c "%s"' % (sys.executable, script)
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
close_fds=False)
try:
kernel32.WaitForSingleObject(hEvent, 5000)
if use_pid:
pi = ProcessInformation(proc.pid)
else:
pi = ProcessInformation(handle=int(proc._handle))
assert pi.process_id == proc.pid
assert pi.image_path == sys.executable
assert pi.command_line == cmd
assert pi.module_names[0] == sys.executable
if show_modules:
print('\n'.join(pi.module_names))
finally:
proc.terminate()
finally:
kernel32.CloseHandle(hEvent)
print('Test 1: current process')
pi = ProcessInformation()
assert os.getpid() == pi.process_id
assert pi.image_path == pi.module_names[0]
print('Test 2: remote process (Handle)')
test_remote(use_pid=False)
print('Test 3: remote process (PID)')
test_remote(show_modules=True)
Output in Windows 10, using 64-bit Python 3.5:
Test 1: current process
Test 2: remote process (Handle)
Test 3: remote process (PID)
C:\Program Files\Python35\python.exe
C:\Windows\SYSTEM32\ntdll.dll
C:\Windows\system32\KERNEL32.DLL
C:\Windows\system32\KERNELBASE.dll
C:\Program Files\Python35\python35.dll
C:\Program Files\Python35\VCRUNTIME140.dll
C:\Windows\SYSTEM32\ucrtbase.dll
C:\Windows\system32\ADVAPI32.dll
C:\Windows\system32\WS2_32.dll
C:\Windows\system32\sechost.dll
C:\Windows\system32\RPCRT4.dll
C:\Windows\system32\NSI.dll
C:\Windows\system32\msvcrt.dll
C:\Windows\SYSTEM32\CRYPTBASE.DLL
C:\Windows\SYSTEM32\bcryptPrimitives.dll
C:\Windows\SYSTEM32\CRYPTSP.dll
C:\Windows\SYSTEM32\bcrypt.dll
C:\Windows\system32\rsaenh.dll
C:\Program Files\Python35\python3.dll
C:\Program Files\Python35\DLLs\_ctypes.pyd
C:\Windows\system32\ole32.dll
C:\Windows\system32\combase.dll
C:\Windows\system32\GDI32.dll
C:\Windows\system32\USER32.dll
C:\Windows\system32\OLEAUT32.dll
C:\Windows\system32\IMM32.DLL
C:\Windows\system32\MSCTF.dll
Solution 2:
I only posted this as a solution since comments are too small and can't be easily formatted.
First, want to emphasize what's on the [MS.Docs]: NtQueryInformationProcess function:
NtQueryInformationProcess may be altered or unavailable in future versions of Windows. Applications should use the alternate functions listed in this topic.
...
The NtQueryInformationProcess function and the structures that it returns are internal to the operating system and subject to change from one release of Windows to another. To maintain the compatibility of your application, it is better to use public functions mentioned in the description of the ProcessInformationClass parameter instead.
If you do use NtQueryInformationProcess, access the function through run-time dynamic linking. This gives your code an opportunity to respond gracefully if the function has been changed or removed from the operating system. Signature changes, however, may not be detectable.
This function has no associated import library. You must use the LoadLibrary and GetProcAddress functions to dynamically link to Ntdll.dll.
Now, if you still want to use this func, you have to define (recursively) all the structures that are used by it. Some of them can be found at: [MS.Docs]: _PEB structure or [MS.Docs]: _PEB_LDR_DATA structure, but you'll find them in VStudio's winternl.h. Again: they may be altered in future versions of Windows.
Also: always check the return codes when dealing with WinAPIs.
I have defined some (not all) of the structures in Python; I also had to change some of the existing ones (note that if a member has an incorrect offset from the struct's beginning it will be messed up):
import ctypes
from ctypes import wintypes
classLIST_ENTRY(ctypes.Structure):
pass
LIST_ENTRY._fields = [
("Flink", ctypes.POINTER(LIST_ENTRY)),
("Blink", ctypes.POINTER(LIST_ENTRY)),
]
classUNICODE_STRING(ctypes.Structure):
_fields_ = [
("Length", wintypes.USHORT),
("MaximumLength", wintypes.USHORT),
("Buffer", ctypes.POINTER(ctypes.c_wchar)),
]
classLDR_DATA_TABLE_ENTRY(ctypes.Structure):
_fields_ = [
("Reserved1", ctypes.c_void_p * 2),
("InMemoryOrderLinks", LIST_ENTRY),
("Reserved2", ctypes.c_void_p * 2),
("DllBase", ctypes.c_void_p),
("EntryPoint", ctypes.c_void_p),
("Reserved3", ctypes.c_void_p),
("FullDllName", UNICODE_STRING),
("Reserved4", wintypes.BYTE * 8),
("Reserved5", ctypes.c_void_p * 3),
("Reserved6", ctypes.c_void_p),
("TimeDateStamp", wintypes.ULONG),
]
classPEB_LDR_DATA(ctypes.Structure):
_fields_ = [
("Reserved1", wintypes.BYTE * 8),
("Reserved2", ctypes.c_void_p * 3),
("InMemoryOrderModuleList", LIST_ENTRY),
]
classPEB(ctypes.Structure):
_fields_ = [
("Reserved1", wintypes.BYTE * 2),
("BeingDebugged", wintypes.BYTE),
("Reserved2", wintypes.BYTE),
("Reserved3", ctypes.c_void_p * 2),
("Ldr", ctypes.POINTER(PEB_LDR_DATA)),
("ProcessParameters", ctypes.POINTER(ctypes.c_int)),
("Reserved4", wintypes.BYTE * 104),
("Reserved5", ctypes.c_void_p * 52),
("PPS_POST_PROCESS_INIT_ROUTINE", ctypes.POINTER(ctypes.c_int)),
("Reserved6", wintypes.BYTE * 128),
("Reserved7", ctypes.c_void_p),
("SessionId", ctypes.c_ulong),
]
classPROCESS_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("Reserved1", ctypes.c_void_p),
("PebBaseAddress", ctypes.POINTER(PEB)),
("Reserved2", ctypes.c_void_p * 2),
("UniqueProcessId", ctypes.POINTER(ctypes.c_ulong)),
("Reserved3", ctypes.c_void_p),
]
At the end, I want to add that most likely any data can be obtained using other (supported) methods (functions that are part of public API), even if they might call this func internally.
Post a Comment for "How To Access The Peb Of Another Process With Python Ctypes"