#!/usr/bin/env python3 # LOGIN # https://steam.readthedocs.io/en/latest/api/steam.webauth.html import steam.webauth as wa from os import environ import imaplib import email from time import sleep # get previously known message ids from imap server M = imaplib.IMAP4_SSL(environ['IMAP_HOST']) M.login(environ['IMAP_USER'], environ['IMAP_PASSWORD']) M.select() typ, data = M.search(None, '(FROM "noreply@steampowered.com" SUBJECT "Your Steam account: Access from new web or mobile device")') old_msg_ids = data[0].split() # perform steam login user = wa.WebAuth(environ['STEAM_USERNAME']) try: user.login(environ['STEAM_PASSWORD']) except (wa.LoginIncorrect) as exp: raise except (wa.CaptchaRequired) as exp: print(user.captcha_url) user.login(password=environ['STEAM_PASSWORD'], captcha=input("Captcha: ")) except wa.EmailCodeRequired: while True: typ, data = M.search(None, '(FROM "noreply@steampowered.com" SUBJECT "Your Steam account: Access from new web or mobile device")') newest_msg_id = data[0].split()[-1] if newest_msg_id in old_msg_ids: sleep(1) else: break typ, data = M.fetch(newest_msg_id, '(RFC822)') msg = email.message_from_bytes(data[0][1]) for payload in msg.get_payload(): if ( payload.get_content_maintype() == 'text' and payload.get_content_subtype() == 'plain' ): plaintext_lines = payload.get_payload(decode=True).decode().splitlines() code = plaintext_lines[plaintext_lines.index('Login Code') + 1] break user.login(email_code=code) except wa.TwoFactorCodeRequired: user.login(twofactor_code=input("2FA Code: ")) # CRAWL from bs4 import BeautifulSoup from datetime import datetime, timezone import pytz import re from urllib.parse import urlparse import requests pdt = pytz.timezone("Us/Pacific") def parse_trs(trs): for tr in trs: tds = tr.find_all('td') from_url = tds[0].find('a')['href'] from_name = tds[0].text to_url = tds[1].find('a')['href'] to_name = tds[1].text date = datetime.strptime(tds[2].text, '%b %d, %Y @ %I:%M%p PDT').replace(tzinfo=pdt) text = tds[3].text print(f'({date}) {from_name} -> {to_name}: {text}') # download steamuserimages for url_string in re.findall(r'(https?://\S+)', text): url = urlparse(url_string) if url.netloc.startswith('steamuserimages'): response = requests.get(url_string) with open(url.path.strip('/').replace('/', '_'), "wb") as f: f.write(response.content) # get first page r = user.session.get('https://help.steampowered.com/en/accountdata/GetFriendMessagesLog') soup = BeautifulSoup(r.text, 'html.parser') continue_value = soup.find(class_='AccountDataLoadMore')['data-continuevalue'] account_data_table = soup.find(id='AccountDataTable_1') trs = account_data_table.find_all('tr')[1:] parse_trs(trs) # get further pages while True: r = user.session.get(f'https://help.steampowered.com/en/accountdata/AjaxLoadMoreData/?url=GetFriendMessagesLog&continue={continue_value}') continue_value = r.json()['continue'] html = r.json()['html'] trs = BeautifulSoup(html, 'html.parser').find_all('tr') parse_trs(trs) # CLOSE M.close() M.logout()