#!/usr/bin/env python3 # LOGIN # https://steam.readthedocs.io/en/latest/api/steam.webauth.html import steam.webauth as wa from os import environ import imaplib import email from time import sleep from os import path print('loggin into STEAM:', environ['STEAM_USERNAME']) user = wa.MobileWebAuth(environ['STEAM_USERNAME']) # try oauth token try: print('trying oauth login') with open('oauth_token', 'r') as file: user.oauth_login(file.read(), steam_id=environ['STEAM_ID'], language='english') print('oauth login successful') except (wa.LoginIncorrect, FileNotFoundError) as exp: print('oauth login failed:', exp) # get previously known message ids from imap server print('loggin into IMAP Server:', environ['IMAP_HOST']) Mailbox = imaplib.IMAP4_SSL(environ['IMAP_HOST']) Mailbox.login(environ['IMAP_USER'], environ['IMAP_PASSWORD']) Mailbox.select() typ, data = Mailbox.search(None, '(FROM "noreply@steampowered.com" SUBJECT "Your Steam account: Access from new web or mobile device")') old_msg_ids = data[0].split() # perform steam login try: user.login(environ['STEAM_PASSWORD']) except (wa.LoginIncorrect) as exp: print('ERROR: loggin incorrect') raise except (wa.CaptchaRequired) as exp: print('ERROR: captcha required:', user.captcha_url) #user.login(password=environ['STEAM_PASSWORD'], captcha=input("Captcha: ")) raise except wa.TwoFactorCodeRequired: print('ERROR: 2FA code required') #user.login(twofactor_code=input("2FA Code: ")) raise except wa.EmailCodeRequired: print('getting email code') while True: typ, data = Mailbox.search(None, '(FROM "noreply@steampowered.com" SUBJECT "Your Steam account: Access from new web or mobile device")') newest_msg_id = data[0].split()[-1] if newest_msg_id in old_msg_ids: print('refreshing messages') sleep(1) else: print('messages received') break typ, data = Mailbox.fetch(newest_msg_id, '(RFC822)') msg = email.message_from_bytes(data[0][1]) for payload in msg.get_payload(): if ( payload.get_content_maintype() == 'text' and payload.get_content_subtype() == 'plain' ): plaintext_lines = payload.get_payload(decode=True).decode().splitlines() code = plaintext_lines[plaintext_lines.index('Login Code') + 1] break Mailbox.close() Mailbox.logout() print('code found:', code) user.login(email_code=code, language='english') with open('oauth_token', 'w') as file: file.write(user.oauth_token) print('password login successful') # CRAWL from bs4 import BeautifulSoup from datetime import datetime, timezone, timedelta import re from urllib.parse import urlparse import requests from hashlib import sha3_256 import pg8000 db = pg8000.connect( database=environ['DB_NAME'], user=environ['DB_USER'], password=environ['DB_PASSWORD'], ) def query(query, **params): cursor = db.cursor() cursor.paramstyle = "named" cursor.execute(query, params) db.commit() return cursor def parse_trs(trs): for tr in trs: tds = tr.find_all('td') from_url = tds[0].find('a')['href'] from_name = tds[0].text to_url = tds[1].find('a')['href'] to_name = tds[1].text timestamp_string, timezone_string = tds[2].text.rsplit(None, 1) offsets = {'PST': -8, 'PDT': -7} timezone_object = timezone(timedelta(hours=offsets[timezone_string]), 'UTC') date = datetime.strptime(timestamp_string, '%b %d, %Y @ %I:%M%p').replace(tzinfo=timezone_object) message = tds[3].text checksum = sha3_256( (from_url + to_url + str(date.timestamp()) + message).encode() ).hexdigest() print(f'(#{checksum}@{date}) {from_name} -> {to_name}: {message}') if query( ''' SELECT 1 FROM messages WHERE checksum = :checksum ''', checksum=checksum, ).rowcount: print(f'message {checksum} already exists') else: print(f'adding new message {checksum}') query( ''' INSERT INTO messages (checksum, from_url, from_name, to_url, to_name, date, message) VALUES(:checksum, :from_url, :from_name, :to_url, :to_name, :date, :message) ''', checksum=checksum, from_url=from_url, from_name=from_name, to_url=to_url, to_name=to_name, date=date, message=message, ) # download steamuserimages for url_string in re.findall(r'(https?://\S+)', message): url = urlparse(url_string) if url.netloc.startswith('steamuserimages'): response = requests.get(url_string) with open('steamuserimages/' + url.path.strip('/').replace('/', '_'), "wb") as f: f.write(response.content) # get first page print('parsing friend messages log') continue_value = 1 while continue_value: response = user.session.get(f'https://help.steampowered.com/en/accountdata/AjaxLoadMoreData/?url=GetFriendMessagesLog&continue={continue_value}') html = response.json()['html'] trs = BeautifulSoup(html, 'html.parser').find_all('tr') parse_trs(trs) continue_value = response.json()['continue']