2024 Mid-Year Summary
Unconsciously, half of 2024 has already passed. There are many things worth recording in this past half year.
Traffic alert.
Unconsciously, half of 2024 has already passed. There are many things worth recording in this past half year.
Traffic alert.
I've been using a computer for so many years, I have also collected a number of software applications that enhance usability and improve efficiency (and got a cluttered taskbar). Seizing this opportunity to share, with periodic updates.
Attachment to the topic: http://121.40.89.206/20230311/kheap_9010ffcba2dfbfd58c7ab541015b24ec.zip
Although I've studied the kernel a bit, this is the first attempt at a kernel PWN challenge.
This is actually the very first time I dig into Kernel PWN LOL
Since Kernel Version 6.1, Rust support has been merged into Linux.
I'm about to start looking into Kernel Pwn recently, but it seems that the kernel of WSL2 is not the original one, so I decided to replace it.
Having nothing better to do, I made some casual changes. Actually, I had already done some customization to the system before (like transparent taskbar, frosted glass, and such), but suddenly felt the lack of density of the second dimension, so I decided to make some more changes.
I've been using AsyncIO
for asynchronous programming in Python
, but I've never thought about why. Let's take this opportunity to understand AsyncIO
better.
First of all, we need to understand what an Iterable
is, which is basically an object that can be used in a for
loop. Common examples of Iterable
include list
, str
, tuple
, and dict
.
In Python, how does it determine if an object is an Iterable
? We can use the dir()
function to check its attribute list.
By running the following code, we can see their common interface:
from typing import Iterable
iterable = [
"", # str
[], # list
{}, # dict
(), # tuple
set() # set
]
def show_diff(*objects: Iterable):
"""Print the attribute differences between Iterable and object"""
assert objects
attrs = set(dir(objects[0]))
for obj in objects[1:]:
attrs &= set(dir(obj)) # Get the intersection of Iterables
attrs -= set(dir(object)) # Get the difference between Iterable and object
print(attrs)
show_diff(*iterable)
# {'__iter__', '__contains__', '__len__'}
As we can see, the key attribute is __iter__
. In fact, for any object that has the __iter__
method specified, it will be considered an Iterable
. Attributes like __len__
and __contains__
are common to container
type Iterables.
If we add a non-container
type Iterable
, the result becomes obvious:
iterable = [
"", # str
[], # list
{}, # dict
(), # tuple
set(), # set
open(__file__) # IO
]
show_diff(*iterable)
# {'__iter__'}
In Python, methods like __iter__
in Iterables have corresponding calling methods, which is iter()
.
Let's see the results when we use iter()
on the container
type Iterables listed above:
for i in iterable:
print(iter(i))
"""
<str_iterator object at 0x7f7bd06fafe0>
<list_iterator object at 0x7f7bd06fafe0>
<dict_keyiterator object at 0x7f7bd08c4b80>
<tuple_iterator object at 0x7f7bd06fafe0>
<set_iterator object at 0x7f7bd0720440>
"""
We can see that they all return an Iterator
object. As demonstrated in the Iterable
section, let's once again find the attribute differences among them:
# {'__next__', '__iter__'}
So, compared to Iterable
, there is an additional __next__
method in Iterator
, which is used to return data in the next iteration.
In the end, after all values have been iterated, it will raise a StopIteration
error to indicate the end of the iteration.
We can build a custom Iterator with the following code:
class MyIterator:
def __init__(self, Iter):
self.index = 0
self.data = Iter
def __next__(self):
while self.index < len(self.data):
data = self.data[self.index]
self.index += 1
return data
raise StopIteration
def __iter__(self):
"""Iterators must be iterable"""
return self
things = ["I", "AM", "ITERABLE", "GOD"]
for i in MyIterator(things):
print(i)
Stay tuned for the next parts!```python task...") t1 = time.time() await Awaitable(sleep, 2) assert time.time() - t1 > 2, "You didn't block, silly pig" print(" I'm finished") return 123
class Awaitable: def init(self, *obj): self.obj = obj
def __await__(self):
yield self.obj
class Task: def init(self, _task): self.coro = _task
def run(self):
while True:
try:
x = self.coro.send(None)
except StopIteration as _e:
result = _e.value
break
else:
func, arg = x
func(arg)
return result
Task(task()).run()
Returning to our `small_step`, we are using a hard-coded blocking mechanism `sleep(2)`, but in reality, there are more types of blocking than just this one. We should aim for a more general mechanism for blocking.
In `Awaitable`, we are directly yielding `self`.
```python
class Awaitable:
def __init__(self, *obj):
self.obj = obj
def __await__(self):
yield self
class Task:
def __init__(self, _task):
self.coro = _task
def run(self):
while True:
try:
x = self.coro.send(None)
except StopIteration as _e:
result = _e.value
break
else:
func, arg = x.obj
func(arg)
return result
Now, notice one thing: our Task.run()
function is still blocking, and we haven't completely yielded control of our program's execution. Let's continue to modify the Task
code.
class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None
def run(self):
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
... # This should not happen, an exception should be raised
t = Task(task())
t.run()
for i in range(10): # During sleep(2), we can do other things.
print("doing something", i)
sleep(0.2)
t.run()
We are manually scheduling multiple tasks here. In reality, we should schedule tasks automatically through an event loop (Event Loop
).
Event Loop
Firstly, tasks must have a queue. We can use a deque
double-ended queue to store tasks.
class Event:
def __init__(self):
self._queue = collections.deque()
def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))
Next, we add scheduled tasks. Due to the special nature of scheduled tasks, we use a heap to store them. Here, we leverage heapq
for operations.
class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))
def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))
Let's write the event scheduling function.
class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
self._stopping = False
def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))
def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # At least one execution is necessary, so put the condition check below
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled and now > self._scheduled[0][0]:
_, cb, args, kwargs = heapq.heappop(self._scheduled)
self._queue.append((cb, args, kwargs))
task_num = len(self._queue) # Prevent adding more tasks to the queue during execution
for _ in range(task_num):
cb, args, kwargs = self._queue.popleft()
cb(*args, **kwargs)
t = Task(task())
loop = Event()
loop.call_soon(t.run)
loop.call_later(2, t.run)
loop.call_later(2.1, loop.stop)
loop.run_forever()
Now, let's modify small_step
async def small_step():
t1 = time.time()
time_ = random.randint(1, 3)
await Awaitable(time_)
assert time.time() - t1 > time_, f"{time_} You didn't block, silly pig {time.time() - t1}"
return time_
As this time has been passed to Task
, we need to handle it in Task
, which means adding a loop.call_later()
while blocking.
class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None
def run(self):
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
loop.call_later(*x.obj, self.run)
else:
... # This should not happen, an exception should be raised
Now, we can remove the manually specified call_later
t = Task(task())
loop = Event()
loop.call_soon(t.run)
loop.call_later(1.1, loop.stop) # random() will only yield values between 0 and 1
loop.run_forever()
Finally, let's try implementing multiple tasks and actually demonstrate the async effect through some parameters.
import collections
import heapq
import itertools
import random
import time
from time import sleep
count = itertools.count(0)
total = 0
async def task():
""" Create a new task """
print("TASK BEGIN...")
print(" MainStep...")
main_result = await main_step()
print(f" MainStep Finished with result {main_result}")
print("TASK END")
async def main_step():
print(" SmallStep(s)...")
small_result = await small_step()
print(f" SmallStep(s) Finished with result {small_result}")
return small_result * 100
async def small_step():
t1 = time.time()
time_ = random.random()
await Awaitable(time_)
assert time.time() - t1 > time_, f"{time_} You didn't block, silly pig {time.time() - t1}"
return time_
class Awaitable:
def __init__(self, *obj):
self.obj = obj
def __await__(self):
yield self
class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None
self._id = f"Task-{next(count)}"
def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
loop.call_later(*x.obj, self.run)
else:
... # This should not happen, an exception should be raised
print("-------------------------")
class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
self._stopping = False
def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))
def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
global total
total += delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # At least one execution is necessary, so put the condition check below
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled and now > self._scheduled[0][0]:
_, cb, args, kwargs = heapq.heappop(self._scheduled)
self._queue.append((cb, args, kwargs))
task_num = len(self._queue) # Prevent adding more tasks to the queue during execution
for _ in range(task_num):
cb, args, kwargs = self._queue.popleft()
cb(*args, **kwargs)
t = Task(task())
loop = Event()
loop.call_soon(t.run)
loop.call_later(1.1, loop.stop)
loop.run_forever()
Here, we can see that while we would normally need around 509.3s
to run all the tasks, thanks to the concurrent execution achieved through task scheduling, we finished running all 1000 tasks within just 1 second.
Future
Finally, our code actively uses sleep
to simulate blocking. How should we do this in a real-world scenario?
Typically, we want to perform an operation and obtain a value, as shown below:
async def small_step():
result = await Awaitable(...)
return result
In this situation, we should introduce Future
. What is a Future
? It's a result that will happen in the future, as opposed to Awaitable
, where we can't pass the result at the time of creation.
class Future:
def __init__(self):
self._result = None
self._done = False
def set_result(self, result):
if self._done:
raise RuntimeError() # Disallowed operation
self._result = result
self._done = True
@property
def result(self):
if self._done:
return self._result
raise RuntimeError()
def __await__(self):
yield self
Therefore, we need something to designate when to execute set_result
.
async def small_step():
fut = Future()
# do something that will call set_result
...
result = await fut
return result
In this case, Task
should receive this future
, but the future
doesn't have any information, only a flag telling us the task is not yet completed.
How does our Task
know when to resume execution?
We can add a callback
record in Future
to signify this.
class Future:
def __init__(self):
self._result = None
self._done = False
self._callbacks = []
def add_done_callback(self, cb):
self._callbacks.append(cb)
def set_result(self, result):
if self._done:
raise RuntimeError() # Disallowed operation
self._result = result
self._done = True
for cb in self._callbacks:
cb() # May have other parameters
@property
def result(self):
if self._done:
return self._result
raise RuntimeError()
def __await__(self):
yield self
return self.result # result = await fut will retrieve this value
class Task:
def __init__(self, _task):
self.coro = _task
self._done = False
self._result = None
self._id = f"Task-{next(count)}"
def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self._result = _e.value
self._done = True
else:
x.add_done_callback(self.run)
else:
... # This should not happen, an exception should be raised
print("-------------------------")
Now, we can observe Task
and Future
We can see that Task
can simply inherit from Future
.
class Task(Future):
def __init__(self, _task):
super().__init__()
self.coro = _task
self._id = f"Task-{next(count)}"
def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self.set_result(_e.value)
else:
x.add_done_callback(self.run)
else:
... # This should not happen, an exception should be raised
print("-------------------------")
At this point, AsyncIO
is basically implemented. However, compared to Python
's own AsyncIO
, our code could be considered very basic. It lacks in performance (since it's not written in C) and has issues in exception handling and other areas. Finally, here is the optimized code. (Didn't mention the hook-up between Task
and loop
, but it's written)
import collections
import heapq
import itertools
import random
import threading
import time
from time import sleep
count = itertools.count(0)
blocked = 0
async def task():
""" Create a new task """
print("TASK BEGIN...")
print(" MainStep...")
main_result = await main_step()
print(f" MainStep Finished with result {main_result}")
print("TASK END")
async def main_step():
print(" SmallStep(s)...")
small_result = await small_step()
print(f" SmallStep(s) Finished with result {small_result}")
return small_result * 100
async def small_step():
fut = Future()
fake_io(fut)
result = await fut
return result
class Future:
def __init__(self):
self._result = None
self._done = False
self._callbacks = []
def add_done_callback(self, cb):
self._callbacks.append(cb)
def set_result(self, result):
if self._done:
raise RuntimeError() # Disallowed operation
self._result = result
self._done = True
for cb in self._callbacks:
cb() # May have other parameters
@property
def result(self):
if self._done:
return self._result
raise RuntimeError()
def __await__(self):
yield self
return self.result
class Task(Future):
def __init__(self, _task):
super().__init__()
self._loop = loop
self.coro = _task
self._id = f"Task-{next(count)}"
self._loop.call_soon(self.run)
self._start_time = time.time()
def run(self):
print(f"--------- {self._id} --------")
if not self._done:
try:
x = self.coro.send(None)
except StopIteration as _e:
self.set_result(_e.value)
global blocked
blocked += time.time() - self._start_time
else:
x.add_done_callback(self.run)
else:
... # This should not happen, an exception should be raised
print("-------------------------")
class Event:
def __init__(self):
self._queue = collections.deque()
self._scheduled = []
self._stopping = False
def call_soon(self, callback, *args, **kwargs):
self._queue.append((callback, args, kwargs))
def call_later(self, delay, callback, *args, **kwargs):
_t = time.time() + delay
heapq.heappush(self._scheduled, (_t, callback, args, kwargs))
def stop(self):
self._stopping = True
def run_forever(self):
while True:
self.run_once() # At least one execution is necessary, so put the condition check below
if self._stopping:
break
def run_once(self):
now = time.time()
if self._scheduled and now > self._scheduled[0][0] + (10 ** -5):
_, cb, args, kwargs = heapq.heappop(self._scheduled)
self._queue.append((cb, args, kwargs))
task_num = len(self._queue) # Prevent adding more tasks to the queue during execution
for _ in range(task_num):
cb, args, kwargs = self._queue.popleft()
cb(*args, **kwargs)
def fake_io(fut):
def read():
sleep(t_ := random.random()) # IO blocking
fut.set_result(t_)
threading.Thread(target=read).start()
def run_until_all_task(tasks):
if tasks := [_task for _task in tasks if not _task._done]:
loop.call_soon(run_until_all_task, tasks)
else:
loop.call_soon(loop.stop)
loop = Event()
all_tasks = [Task(task()) for _ in range(1000)]
loop.call_soon(run_until_all_task, all_tasks)
t1 = time.time()
loop.run_forever()
print(time.time() - t1, blocked)
This Content is generated by LLM and might be wrong / incomplete, refer to Chinese version if you find something wrong.
Access the problem platform here
This level is probably designed to familiarize you with how contracts work.
The challenge may lie in dealing with money (currently, perhaps only GOERILI FAUCET can provide some money, 0.2 per day).
After the update to version 3.0, the methods for exporting tokens in Steam that worked in the 2.X version are no longer applicable. So I spent some time on reverse engineering and patching, and summarized several methods to export tokens in Steam version 3.0. Of course, all these methods require one precondition: having a rooted device.
Planning to use FASTAPI
as the backend. Create an environment using Conda
, and install FASTAPI
pip install fastapi[all]
Use uvicorn
as the server for running the program
Start by setting up the basic framework
import uvicorn
from fastapi import FastAPI
app = FastAPI()
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=4345)
As a first version, I plan to use cookies directly for login operations, and may consider switching to username and password in future versions
To implement STEAM login, the relevant requests need to be captured.
public_key
through getrsakey/
, with donotcache
and username
fields in the payload
Here, donotcache
is the timestamp multiplied by 1000 and rounded, and username
is the plaintext Steam account name
The returned JSON looks like
{
"success": true,
"publickey_mod": "deadbeef0deadbeef0deadbeef",
"publickey_exp": "010001",
"timestamp": "216071450000",
"token_gid": "deadbeef0deadbee"
}
Provided modulus
and exponent
, we need to generate our public key and encrypt the password
i.e.
dologin/
to login with different payloads and validate 2fa
The typical payload looks like
{
"donotcache": 1646663656289, // Same timestamp as above
"password": "base64_encoded_encrypted_password", // RSA public key encrypted binary data encoded in base64
"username": "username", // Username
"twofactorcode": "Guard_Code", // Mobile authenticator code
"emailauth": "", // Email verification code
"captchagid": 4210307962151791925, // CaptchaGID, retrieved from the value returned by `do_login/`, and fetch the Captcha image at `https://steamcommunity.com/login/rendercaptcha/?gid=%captchagid%`
"captcha_text": "th37yr", // Captcha text, if needed, must exist simultaneously with the item above
"rsatimestamp": 216071450000, // RSA expiration time, obtainable in `getrsakey/`
"remember_login": true // Save login information (although we don't need it)
}
The results are conveyed through different return values, for example:
{
"success": false,
"requires_twofactor": true,
"message": ""
}
{
"success": false,
"message": "Please enter the characters below to verify this is a human operation.",
"requires_twofactor": false,
"captcha_needed": true,
"captcha_gid": "4209182061243079173"
}
Using aiohttp
for interaction
import base64
import rsa
import time
from aiohttp import ClientSession
from typing import Dict
BASE_STEAM_URL = "https://steamcommunity.com"
GET_RSA_KEY_API_URL = "/login/getrsakey/"
DO_LOGIN_API_URL = "/login/dologin/"
LOGIN_URL = "/login?oauth_client_id=DEADBEEF&oauth_scope=read_profile%20write_profile%20read_client%20write_client"
class Response(Dict):
def __getattr__(self, item):
return self.get(item)
def __setattr__(self, key, value):
self.__setitem__(key, value)
async def do_login(username: str,
password: str,
twofactorcode: str = '',
emailauth: str = '',
captchagid: int = 0,
captcha_text: str = '',
headers: Dict = None,
cookies: Dict = None,
**kwargs) -> Response:
"""
login steam and return the Response
:param username: steam username
:param password: steam password, should be plaintext
:param twofactorcode: optional, steam guard code
:param emailauth: optional, steam email guard code
:param captchagid: optional, steam will tell it if needed
:param captcha_text: optional, captcha text, should be set together with captchagid
:param headers: optional, custom headers
:param cookies: optional, custom cookies
:param kwargs: optional, args for ClientSession
:return:
"""
if headers is None:
headers = {"X-Requested-With": "com.valvesoftware.android.steam.community",
"Referer": "https://steamcommunity.com/mobilelogin?oauth_client_id=DEADBEEF&oauth_scope=read_profile%20write_profile%20read_client%20write_client"}
if cookies is None:
cookies = {"mobileClientVersion": "0 (2.3.13)",
"mobileClient": "android",
"Steam_Language": "schinese"}
async with ClientSession(headers=headers, cookies=cookies, **kwargs) as session:
data = {
"donotcache": int(time.time()*1000),
"username": username
}
async with session.post(BASE_STEAM_URL + GET_RSA_KEY_API_URL, data=data) as resp:
if resp.status == 200 and (response := await resp.json()).get("success"):
response = Response(response)
modulus = int(response.publickey_mod, 16)
exponent = int(response.publickey_exp, 16)
rsa_timestamp = response.timestamp
else:
if resp.status == 200:
raise ConnectionError(f"Get RSA Key Error! [{resp.status}]: {response}")
else:
raise ConnectionError(f"Get RSA Key Error! Error Code: {resp.status}")
public_key = rsa.PublicKey(modulus, exponent)
en_password = password.encode(encoding='UTF-8')
en_password = rsa.encrypt(en_password, public_key)
en_password = base64.b64encode(en_password)
data = {
"donotcache": int(time.time() * 1000),
"username": username,
"password": en_password.decode('UTF-8'),
"twofactorcode": twofactorcode,
"emailauth": emailauth,
"rsatimestamp": rsa_timestamp,
"remember_login": True
}
if captchagid and captcha_text:
data["captchagid"] = captchagid
data["captcha_text"] = captcha_text
async with session.post(BASE_STEAM_URL + DO_LOGIN_API_URL, data=data) as resp:
if resp.status == 200:
response = Response(await resp.json())
if response.success:
response.cookie = resp.cookies.output()
response.cookie_object = resp.cookies
return response
else:
raise ConnectionError(f"Login Error! Error Code: {resp.status}")
Not much to elaborate, just created a Response
class to save some time.
It's worth noting that when logging in successfully, I pass in a cookie
and a cookie_object
(Simplecookie object
) as it will be useful for subsequent uses.
TODO: raising a
ConnectionError
, may create custom exceptions later on.
Before implementing token generation, let's first understand the principle behind it
Firstly, it's important to recognize that STEAM token generation algorithm is a form of Time-based One-time Password (TOTP) algorithm
Based on the RFC-6238
standard STEAM utilizes, in the implementation process of this algorithm, Client
and Server
need to negotiate a common Secret
as a key—referred to as shared_secret
in the token detailed data
At this point, according to the standard STEAM token generation technique utilized, the T0
(Unix Time) and T1
(30s) along with the current timestamp are used to calculate the message C
(counter, i.e., how many T1
s have passed since T0
), and the key Secret
is used as the key to calculate the HMAC
value using the standard encryption algorithm SHA-1
Taking the lowest 4 significant bits of the HMAC
as the byte offset
and discarding them
After discarding these 4 bits, from the MSB
of the byte offset
, discard the most significant bit (to avoid it becoming a sign bit), and take 31 bits, the password is these as decimal numbers based on 10.
STEAM further adapts this by assigning CODE_CHARSET to the digits. The specific method is to divide the decimal number corresponding to the password by the length of CODE_CHARSET
, where the remainder is the index to CODE_CHARSET
, and the quotient continues the operation with the new decimal number until 5 numbers are obtained.
The
CODE_CHARSET
and the algorithm mapping it are not found from reliable sources. It is speculated that it was obtained by decompiling the STEAM client or knowledgeable attempts.
It is a sin to reinvent the wheel. In the spirit of using more libraries since it is for personal use, I chose the pyotp
library as a one-click TOTP
generation tool.
However, it failed. For unknown reasons, the base32 secret generated incorrectly.
Having gained a thorough understanding of the implementation principle, I decided to implement this algorithm manually once, abstaining from using ready-made libraries to simplify the project.
import hmac
import hashlib
import time
import base64
def gen_guard_code(shared_secret: str) -> str:
"""
Generate the Guard Code using `shared_secret`
:param shared_secret: shared_secret, should be a base64-encoded string
:return: the guard code
"""
shared_secret = shared_secret.encode('UTF-8')
b64_decoded_shared_secret = base64.b64decode(shared_secret)
time_bytes = (int(time.time()) // 30).to_bytes(8, byteorder='big') # Turn time_stamp into a 64 bit unsigned int
hmac_code = hmac.new(b64_decoded_shared_secret, time_bytes, hashlib.sha1).digest() # Generate HMAC code
byte_offset = hmac_code[-1] & 0xf # Get last 4 bits as bytes offset
code_int = (
(hmac_code[byte_offset] & 0x7f) << 24 | # Drop off the first bit (MSB)
(hmac_code[byte_offset+1] & 0xff) << 16 |
(hmac_code[byte_offset+2] & 0xff) << 8 |
(hmac_code[byte_offset+3] & 0xff)
)
CODE_CHARSET = [50, 51, 52, 53, 54, 55, 56, 57, 66, 67, 68, 70, 71,
72, 74, 75, 77, 78, 80, 81, 82, 84, 86, 87, 88, 89]
codes = ''
for _ in range(5):
code_int, i = divmod(code_int, len(CODE_CHARSET))
codes += chr(CODE_CHARSET[i])
return codes
Trading is perhaps one of the most complicated aspects related to STEAM. It requires identity_secret
and device_id
as parameters.
Through packet sniffing on the mobile end, the confirmation page's API_URL
is https://steamcommunity.com/mobileconf/conf?%payload%
First, let's implement fetch_confirmation_query_params
, i.e., to fetch the confirmation list
Required parameters are
Param | Description |
---|---|
p | device_id |
a | steam_id |
t | Timestamp |
m | Device (Android /IOS ) |
tag | Tag, unique value conf (to be confirmed) |
k | timehash , generated by time_stamp and tag as parameters, encoded in base64 HMAC using identity_secret as the key |
Initially, generate timehash
import base64
import hashlib
import hmac
import time
def gen_confirmation_key(times: int, identity_secret: str, tag: str = 'conf') -> str:
"""
Generate the secret for confirmation to check.
:param times: time_stamp, should be int instead of float
:param identity_secret:
:param tag: 'conf', 'allow', 'cancel', 'details%id%'
:return: base64-encoded secret, which is not urlencoded.
"""
msg = times.to_bytes(8, byteorder='big') + tag.encode('UTF-8')
key = base64.b64decode(identity_secret.encode('UTF-8'))
secret = hmac.new(key, msg, hashlib.sha1).digest()
return base64.b64encode(secret).decode('UTF-8')
Next, write the request call, as the confirmation page seemingly doesn't have a front and back end separation; hence, crawling for the confirmation list is the only approach.
from aiohttp import ClientSession
from urllib.parse import urlencode, quote_plus
from typing import Union, Dict, List
from http.cookies import SimpleCookie
BASE_STEAM_URL = "https://steamcommunity.com"
MOBILECONF_URL = "/mobileconf/conf"
async def fetch_confirmation_query(cookies: Union[Dict, SimpleCookie],
steam_id: str,
identity_secret: str,
device_id: str,
tag: str = "conf",
m: str = "android",
headers: Dict = None) -> Dict[str, Union[str, List[Dict]]]:
"""
fetch confirmation query as a list of json dict.
:param cookies: Cookies contains login information
:param steam_id: 64bit steamid
:param identity_secret:
:param device_id:
:param tag: 'conf'
:param m: 'android', 'ios'
:param headers:
:return: Response of confirmation query.
"""
if headers is None:
headers = {
"X-Requested-With": "com.valvesoftware.android.steam.community",
"Accept-Language": "zh-CN,zh;q=0.9"
}
times = int(time.time())
query = {
"p": device_id,
"a": steam_id,
"k": gen_confirmation_key(times, identity_secret, tag),
"t": times,
"m": m,
"tag": tag
}
async with ClientSession(headers=headers, cookies=cookies) as session:
print(BASE_STEAM_URL + MOBILECONF_URL + '?' + urlencode(query))
print(urlencode(query, safe=":"), type(urlencode(query)))
async with session.get(BASE_STEAM_URL + MOBILECONF_URL + '?' + urlencode(query)) as resp:
if resp.status == 200:
# do something
pass
else:
raise ConnectionError(f"Fetch Confirmation Error! Error Code: {resp.status}")
Based on my usual practices, I opted for beautifulsoup4
as the extractor and lxml
as the parser
from bs4 import BeautifulSoup
def steam_confirmation_parser(html: str):
soup = BeautifulSoup(html, 'lxml')
confirmations = soup.find_all("div", class_="mobileconf_list_entry")
if len(confirmations):
data_list = []
for confirmation in confirmations:
data = {
"type": confirmation.get('data-type'),
"confid": confirmation.get('data-confid'),
"key": confirmation.get('data-key'),
"creator": confirmation.get('data-creator'),
"accept_text": confirmation.get('data-accept'),
"cancel_text": confirmation.get('data-cancel'),
"img": confirmation.find('img')['src'],
"desc": "\n".join(confirmation.stripped_strings)
}
data_list.append(data)
return {
"success": True,
"data": data_list
}
return {
"success": soup.find('div', id="mobileconf_empty"),
"data": ["\n".join(soup.find('div', id="mobileconf_empty").stripped_strings)]
if soup.find('div', id="mobileconf_empty") else ["Invalid Html\nIt is not a parsable html."]
}
Having established the above foundation, sending requests becomes straightforward.
The url
is https://steamcommunity.com/mobileconf/ajaxop?%payload%
The payload
parameters are as follows
Param | Description |
---|---|
p | device_id |
a | steam_id |
t | Timestamp |
m | Device (Android /IOS ) |
op | Action, either cancel or allow |
k | timehash , generated from time_stamp and op as parameters, encoded in base64 HMAC using identity_secret as the key |
cid | data-confid , provided in <div> tag with class mobileconf_list_entry |
ck | data-key , provided in <div> tag with class mobileconf_list_entry |
AJAX_POST_URL = "/mobileconf/ajaxop"
async def send_confirmation_ajax(cookies: Union[Dict, SimpleCookie],
steam_id: str,
identity_secret: str,
device_id: str,
cid: str,
ck: str,
op: str = "allow",
m: str = "android",
headers: Dict = None) -> bool:
"""
Send AJax post to allow/cancel a confirmation
:param cookies: Cookies contains login information
:param steam_id: 64```python
def fetch_confirmation_details(cookies: Dict, steam_id: str, identity_secret: str, device_id: str, cid: str, m: str = "android", headers: Dict = None) -> Dict[str, str]:
"""
Fetch a confirmation's details
:param cookies: Cookies contains login information
:param steam_id: 64bit steamid
:param identity_secret:
:param device_id:
:param cid: data-confid
:param m: 'android', 'ios'
:param headers:
:return: The Response
"""
if headers is None:
headers = {
"X-Requested-With": "com.valvesoftware.android.steam.community",
"Accept-Language": "zh-CN,zh;q=0.9"
}
times = int(time.time())
tag = "details" + cid
query = {
"tag": tag,
"p": device_id,
"a": steam_id,
"k": gen_confirmation_key(times, identity_secret, tag),
"t": times,
"m": m,
}
async with ClientSession(headers=headers, cookies=cookies) as session:
async with session.get(BASE_STEAM_URL + DETAIL_URL + cid + '?' + urlencode(query)) as resp:
if resp.status == 200:
return await resp.json()
else:
raise ConnectionError(f"Fetch Confirmation Details Error! Error Code: {resp.status}")
This Content is generated by LLM and might be wrong / incomplete, refer to Chinese version if you find something wrong.