#!/usr/bin/env python3 from requests import Session from os import utime from os.path import exists, getmtime, getatime, getsize, abspath, join from sys import argv import argparse # args parser = argparse.ArgumentParser(description='Download items from steam workshop.') parser.add_argument('ids', metavar='ITEM_ID', nargs='+', type=int, help='steam workshop file ids') parser.add_argument('-o', '--out', metavar='TRAGET_DIR', dest='out', type=abspath, default='.', help='output dir') args = parser.parse_args() print(f"item ids: {', '.join(str(id) for id in args.ids)}", flush=True) print(f"target dir: {args.out}", flush=True) # init http session session = Session() # get item information response = session.post( 'http://api.steampowered.com/ISteamRemoteStorage/GetPublishedFileDetails/v1', data={ 'itemcount' : len(args.ids), **{ f'publishedfileids[{i}]' : plugin_id for i, plugin_id in enumerate(args.ids) }, }, ) response.raise_for_status() # download items for item in response.json()['response']['publishedfiledetails']: # file found? if item['result'] != 1: raise ValueError(f"getting file '{item['publishedfileid']}' info failed: {item}") # get target path target_path = join(args.out, f"{item['publishedfileid']}.vpk") print(f"- {item['title']}: ", end='', flush=True) # skip item? if ( exists(target_path) and # exists item['time_updated'] == getmtime(target_path) and # mtime matches item['file_size'] == getsize(target_path) # filesize matches ): print(f"already satisfied", flush=True) continue # download item print(f"downloading", end='', flush=True) response = session.get(item['file_url'], stream=True) response.raise_for_status() # one chunk at a time with open(target_path, 'wb') as file: for chunk in response.iter_content(chunk_size=65_536): print('.', end='', flush=True) if chunk: file.write(chunk) # update modify time print(' done', flush=True) utime(target_path, (getatime(target_path), item['time_updated'])) # close session session.close()