144 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| 
 | |
| # LOGIN
 | |
| 
 | |
| # https://steam.readthedocs.io/en/latest/api/steam.webauth.html
 | |
| 
 | |
| import steam.webauth as wa
 | |
| from os import environ
 | |
| import imaplib
 | |
| import email
 | |
| from time import sleep
 | |
| from os import path
 | |
| 
 | |
| print('loggin into STEAM:', environ['STEAM_USERNAME'])
 | |
| user = wa.MobileWebAuth(environ['STEAM_USERNAME'])
 | |
| 
 | |
| # try oauth token
 | |
| try:
 | |
|     print('trying oauth login')
 | |
|     
 | |
|     with open('oauth_token', 'r') as file:
 | |
|         user.oauth_login(file.read(), steam_id=environ['STEAM_ID'], language='english')
 | |
|     
 | |
|     print('oauth login successful')
 | |
| except (wa.LoginIncorrect, FileNotFoundError) as exp:
 | |
|     print('oauth login failed:', exp)
 | |
|     
 | |
|     # get previously known message ids from imap server
 | |
| 
 | |
|     print('loggin into IMAP Server:', environ['IMAP_HOST'])
 | |
| 
 | |
|     Mailbox = imaplib.IMAP4_SSL(environ['IMAP_HOST'])
 | |
|     Mailbox.login(environ['IMAP_USER'], environ['IMAP_PASSWORD'])
 | |
|     Mailbox.select()
 | |
|     typ, data = Mailbox.search(None, '(FROM "noreply@steampowered.com" SUBJECT "Your Steam account: Access from new web or mobile device")')
 | |
|     old_msg_ids = data[0].split()
 | |
| 
 | |
|     # perform steam login
 | |
| 
 | |
|     try:
 | |
|         user.login(environ['STEAM_PASSWORD'])
 | |
|     except (wa.LoginIncorrect) as exp:
 | |
|         print('ERROR: loggin incorrect')
 | |
|         raise
 | |
|     except (wa.CaptchaRequired) as exp:
 | |
|         print('ERROR: captcha required:', user.captcha_url)
 | |
|         #user.login(password=environ['STEAM_PASSWORD'], captcha=input("Captcha: "))
 | |
|         raise
 | |
|     except wa.TwoFactorCodeRequired:
 | |
|         print('ERROR: 2FA code required')
 | |
|         #user.login(twofactor_code=input("2FA Code: "))
 | |
|         raise
 | |
|     except wa.EmailCodeRequired:
 | |
|         print('getting email code')
 | |
|         while True:
 | |
|             typ, data = Mailbox.search(None, '(FROM "noreply@steampowered.com" SUBJECT "Your Steam account: Access from new web or mobile device")')
 | |
|             newest_msg_id = data[0].split()[-1]
 | |
|             
 | |
|             if newest_msg_id in old_msg_ids:
 | |
|                 print('refreshing messages')
 | |
|                 sleep(1)
 | |
|             else:
 | |
|                 print('messages received')
 | |
|                 break
 | |
|         
 | |
|         typ, data = Mailbox.fetch(newest_msg_id, '(RFC822)')
 | |
|         msg = email.message_from_bytes(data[0][1])
 | |
|         
 | |
|         for payload in msg.get_payload():
 | |
|             if (
 | |
|                 payload.get_content_maintype() == 'text' and
 | |
|                 payload.get_content_subtype() == 'plain'
 | |
|             ):
 | |
|                 plaintext_lines = payload.get_payload(decode=True).decode().splitlines()
 | |
|                 code = plaintext_lines[plaintext_lines.index('Login Code') + 1]
 | |
|                 break
 | |
|         
 | |
|         print('code found:', code)
 | |
|         user.login(email_code=code, language='english')
 | |
| 
 | |
|     with open('oauth_token', 'w') as file:
 | |
|         file.write(user.oauth_token)
 | |
| 
 | |
|     print('password login successful')
 | |
| 
 | |
| # CRAWL
 | |
| 
 | |
| from bs4 import BeautifulSoup
 | |
| from datetime import datetime, timezone
 | |
| import pytz
 | |
| import re
 | |
| from urllib.parse import urlparse
 | |
| import requests
 | |
| from hashlib import sha3_256
 | |
| 
 | |
| pdt = pytz.timezone("Us/Pacific")
 | |
| 
 | |
| def parse_trs(trs):
 | |
|     for tr in trs:
 | |
|         tds = tr.find_all('td')
 | |
|         from_url = tds[0].find('a')['href']
 | |
|         from_name = tds[0].text
 | |
|         to_url = tds[1].find('a')['href']
 | |
|         to_name = tds[1].text
 | |
|         date = datetime.strptime(tds[2].text, '%b %d, %Y @ %I:%M%p PDT').replace(tzinfo=pdt)
 | |
|         text = tds[3].text
 | |
|         checksum = sha3_256(
 | |
|             (from_url + to_url + str(date.timestamp()) + text).encode()
 | |
|         ).hexdigest()
 | |
|         print(f'(#{checksum}@{date}) {from_name} -> {to_name}: {text}')
 | |
| 
 | |
|         # download steamuserimages
 | |
|         for url_string in re.findall(r'(https?://\S+)', text):
 | |
|             url = urlparse(url_string)
 | |
|             if url.netloc.startswith('steamuserimages'):
 | |
|                 response = requests.get(url_string)
 | |
|                 with open(url.path.strip('/').replace('/', '_'), "wb") as f:
 | |
|                     f.write(response.content)
 | |
| 
 | |
| # get first page
 | |
| 
 | |
| print('parsing friend messages log')
 | |
| response = user.session.get('https://help.steampowered.com/en/accountdata/GetFriendMessagesLog')
 | |
| soup = BeautifulSoup(response.text, 'html.parser')
 | |
| continue_value = soup.find(class_='AccountDataLoadMore')['data-continuevalue']
 | |
| account_data_table = soup.find(id='AccountDataTable_1')
 | |
| trs = account_data_table.find_all('tr')[1:]
 | |
| 
 | |
| parse_trs(trs)
 | |
| 
 | |
| # get further pages
 | |
| 
 | |
| while True:
 | |
|     print('getting next page')
 | |
|     r = user.session.get(f'https://help.steampowered.com/en/accountdata/AjaxLoadMoreData/?url=GetFriendMessagesLog&continue={continue_value}')
 | |
|     continue_value = r.json()['continue']
 | |
|     html = r.json()['html']
 | |
|     trs = BeautifulSoup(html, 'html.parser').find_all('tr')
 | |
|     parse_trs(trs)
 | |
| 
 | |
| # CLOSE
 | |
| 
 | |
| Mailbox.close()
 | |
| Mailbox.logout()
 | 
