From 9b025bdd6b3bd7677278309a50099c403f7120ec Mon Sep 17 00:00:00 2001 From: mwiegand Date: Tue, 13 Jul 2021 15:28:45 +0200 Subject: [PATCH] wip --- libs/apt.py | 70 ---------------------------------- libs/derive_string.py | 71 ---------------------------------- libs/dns.py | 23 ----------- libs/grafana.py | 29 -------------- libs/ini.py | 24 ------------ libs/keys.py | 15 -------- libs/nextcloud.py | 2 - libs/nginx.py | 27 ------------- libs/ssh.py | 28 -------------- libs/systemd.py | 27 ------------- libs/tools.py | 88 ------------------------------------------- 11 files changed, 404 deletions(-) delete mode 100644 libs/apt.py delete mode 100644 libs/derive_string.py delete mode 100644 libs/dns.py delete mode 100644 libs/grafana.py delete mode 100644 libs/ini.py delete mode 100644 libs/keys.py delete mode 100644 libs/nextcloud.py delete mode 100644 libs/nginx.py delete mode 100644 libs/ssh.py delete mode 100644 libs/systemd.py delete mode 100644 libs/tools.py diff --git a/libs/apt.py b/libs/apt.py deleted file mode 100644 index 0798c41..0000000 --- a/libs/apt.py +++ /dev/null @@ -1,70 +0,0 @@ -# https://manpages.debian.org/jessie/apt/sources.list.5.de.html -from urllib.parse import urlparse -from re import search, sub -from functools import total_ordering - - -@total_ordering -class AptSource(): - def __init__(self, string): - if search(r'\[.*\]', string): - self.options = { - k:v.split(',') for k,v in ( - e.split('=') for e in search(r'\[(.*)\]', string)[1].split() - ) - } - else: - self.options = {} - - parts = sub(r'\[.*\]', '', string).split() - self.type = parts[0] - self.url = urlparse(parts[1]) - self.suite = parts[2] - self.components = parts[3:] - - def __str__(self): - parts = [ - self.type, - self.url.geturl(), - self.suite, - ' '.join(self.components), - ] - - if self.options: - parts.insert( - 1, - "[{}]".format( - ' '.join( - '{}={}'.format( - k, - ','.join(v) - ) for k,v in self.options.items() - ) - ) - ) - - return ' '.join(parts) - - - def __eq__(self, other): - return str(self) == str(other) - - def __lt__(self, other): - return str(self) < str(other) - - def __hash__(self): - return hash(str(self)) - - def __repr__(self): - return f"{type(self).__name__}('{str(self)}')" - - -# source = AptSource('deb [arch=amd64 trusted=true] http://deb.debian.org/debian buster-backports main contrib non-free') -# print(repr(source)) -# print(source.type) -# print(source.options) -# source.options['test'] = ['was', 'ist', 'das'] -# print(source.url) -# print(source.suite) -# print(source.components) -# print(str(source)) diff --git a/libs/derive_string.py b/libs/derive_string.py deleted file mode 100644 index 828103c..0000000 --- a/libs/derive_string.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python - -from hashlib import sha3_256 -from itertools import count, islice -from Crypto.Cipher import ChaCha20 -from math import floor, ceil -from time import sleep -from os import environ -from sys import stderr - -def debug(*args): - if 'DEBUG' in environ: - print(*args, file=stderr) - -def chacha_bits(input, bit_count): - zerobyte = (0).to_bytes(length=1, byteorder='big') - cipher = ChaCha20.new(key=sha3_256(input).digest(), nonce=zerobyte*8) - i = 0 - - while True: - debug(f'--- BITS {i} ---') - start_bit = bit_count * i - start_byte = start_bit // 8 - start_padding = start_bit % 8 - debug('start_bit', start_bit) - debug('start_byte', start_byte) - debug('start_padding', start_padding) - - end_bit = bit_count * i + bit_count - end_byte = end_bit // 8 - end_padding = 8 - (end_bit % 8) - debug('end_bit', end_bit) - debug('end_byte', end_byte) - debug('end_padding', end_padding) - - byte_count = (end_byte - start_byte) + 1 - debug('byte_count', byte_count) - - cipher.seek(start_byte) - cipherint = int.from_bytes(cipher.encrypt(zerobyte*byte_count), byteorder='big') - debug('ciphertext', bin(cipherint)) - shifted_cipherint = cipherint >> end_padding - debug('shifted_ciphertext', bin(shifted_cipherint)) - - bit_mask = int('1'*bit_count, 2) - debug('bit_mask', bin(bit_mask)) - masked_cipherint = shifted_cipherint & bit_mask - debug('masked_ciphertext', bin(masked_cipherint)) - - debug('') - yield masked_cipherint - i += 1 - - -def chacha_chracter(input, choices): - get_bits = chacha_bits(input, len(choices).bit_length()) - - while True: - key = next(get_bits) - if key < len(choices): - yield choices[key] - - -def derive_string(input, length, choices): - sorted_choices = bytes(sorted(choices)) - get_character = chacha_chracter(input, sorted_choices) - return bytes(islice(get_character, length)) - -# print( -# derive_string(b'12344', length=100, choices=b'abcdefghijklmnopqrstuvwxyz0123456789') -# ) diff --git a/libs/dns.py b/libs/dns.py deleted file mode 100644 index a630e8c..0000000 --- a/libs/dns.py +++ /dev/null @@ -1,23 +0,0 @@ -from ipaddress import ip_interface - -def get_a_records(metadata, internal=True, external=True): - networks = metadata.get('network') - - if not internal: - networks.pop('internal', None) - - if not external: - networks.pop('external', None) - - return { - 'A': [ - str(ip_interface(network['ipv4']).ip) - for network in networks.values() - if 'ipv4' in network - ], - 'AAAA': [ - str(ip_interface(network['ipv6']).ip) - for network in networks.values() - if 'ipv6' in network - ], - } diff --git a/libs/grafana.py b/libs/grafana.py deleted file mode 100644 index 73dc1ec..0000000 --- a/libs/grafana.py +++ /dev/null @@ -1,29 +0,0 @@ -from mako.template import Template -from copy import deepcopy - - -def generate_flux(bucket, host, field, data): - return Template(flux_template).render( - bucket=bucket, - host=host, - field=field, - data=data - ).strip() - - -def generate_panel(bucket, host, title, targets, min=None, max=None): - panel = deepcopy(panel_template) - panel['title'] = title - - if min: - panel['fieldConfig']['defaults']['min'] = min - if max: - panel['fieldConfig']['defaults']['max'] = max - - panel['targets'] = [ - { - 'hide': False, - 'refId': field, - 'query': generate_flux(bucket, host, field, data), - } for field, data in targets.items() - ] diff --git a/libs/ini.py b/libs/ini.py deleted file mode 100644 index 792e52b..0000000 --- a/libs/ini.py +++ /dev/null @@ -1,24 +0,0 @@ -from configparser import ConfigParser - -def parse(text): - config = ConfigParser() - config.read_string(text) - - return { - section: dict(config.items(section)) - for section in config.sections() - } - -class Writable(): - data = '' - - def write(self, line): - self.data += line - -def dumps(dict): - config = ConfigParser() - config.read_dict(dict) - writable = Writable() - config.write(writable) - - return writable.data diff --git a/libs/keys.py b/libs/keys.py deleted file mode 100644 index 427a85f..0000000 --- a/libs/keys.py +++ /dev/null @@ -1,15 +0,0 @@ -import base64 -from nacl.public import PrivateKey -from nacl.encoding import Base64Encoder -from bundlewrap.utils import Fault - -def gen_privkey(repo, identifier): - return repo.vault.random_bytes_as_base64_for(identifier) - -def get_pubkey_from_privkey(identifier, privkey): - # FIXME this assumes the privkey is always a base64 encoded string - def derive_pubkey(): - pub_key = PrivateKey(base64.b64decode(str(privkey))).public_key - return pub_key.encode(encoder=Base64Encoder).decode('ascii') - - return Fault(f'pubkey from privkey {identifier}', derive_pubkey) diff --git a/libs/nextcloud.py b/libs/nextcloud.py deleted file mode 100644 index 78e4086..0000000 --- a/libs/nextcloud.py +++ /dev/null @@ -1,2 +0,0 @@ -def occ(command, *args, **kwargs): - return f"""sudo -u www-data php /opt/nextcloud/occ {command} {' '.join(args)} {' '.join(f'--{name.replace("_", "-")}' + (f'={value}' if value else '') for name, value in kwargs.items())}""" diff --git a/libs/nginx.py b/libs/nginx.py deleted file mode 100644 index dc83e25..0000000 --- a/libs/nginx.py +++ /dev/null @@ -1,27 +0,0 @@ -def render_config(config): - return '\n'.join(render_lines(config)) - -def render_lines(config, indent=0): - lines = [] - blocks = [] - - for key, value in sorted(config.items()): - if isinstance(value, dict): - blocks.extend([ - '', - key+' {', - *render_lines(value, indent=4), - '}', - ]) - elif isinstance(value, list): - lines.extend([ - f'{key} {_value};' for _value in value - ]) - else: - lines.append( - f'{key} {value};' - ) - - return [ - f"{' '*indent}{line}" for line in lines+blocks - ] diff --git a/libs/ssh.py b/libs/ssh.py deleted file mode 100644 index 1bbcbf6..0000000 --- a/libs/ssh.py +++ /dev/null @@ -1,28 +0,0 @@ -from base64 import b64decode, b64encode -from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey -from cryptography.hazmat.primitives import serialization - - -def generate_ed25519_key_pair(secret): - privkey_bytes = Ed25519PrivateKey.from_private_bytes(secret) - - nondeterministic_privatekey = privkey_bytes.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.OpenSSH, - encryption_algorithm=serialization.NoEncryption() - ).decode() - nondeterministic_bytes = b64decode(''.join(nondeterministic_privatekey.split('\n')[1:-2])) - # handle random 32bit number, occuring twice in a row - deterministic_bytes = nondeterministic_bytes[:98] + b'00000000' + nondeterministic_bytes[106:] - deterministic_privatekey = '\n'.join([ - '-----BEGIN OPENSSH PRIVATE KEY-----', - b64encode(deterministic_bytes).decode(), - '-----END OPENSSH PRIVATE KEY-----', - ]) - - public_key = privkey_bytes.public_key().public_bytes( - encoding=serialization.Encoding.OpenSSH, - format=serialization.PublicFormat.OpenSSH, - ).decode() - - return (deterministic_privatekey, public_key) diff --git a/libs/systemd.py b/libs/systemd.py deleted file mode 100644 index b52bb88..0000000 --- a/libs/systemd.py +++ /dev/null @@ -1,27 +0,0 @@ -from mako.template import Template - -template = ''' -% for segment, options in data.items(): - -% if '#' in segment: -# ${segment.split('#', 2)[1]} -% endif -[${segment.split('#')[0]}] -% for option, value in options.items(): -% if isinstance(value, dict): -% for k, v in value.items(): -${option}=${k}=${v} -% endfor -% elif isinstance(value, (list, set, tuple)): -% for item in sorted(value): -${option}=${item} -% endfor -% else: -${option}=${str(value)} -% endif -% endfor -% endfor -''' - -def generate_unitfile(data): - return Template(template).render(data=data).lstrip() diff --git a/libs/tools.py b/libs/tools.py deleted file mode 100644 index d96feec..0000000 --- a/libs/tools.py +++ /dev/null @@ -1,88 +0,0 @@ -from ipaddress import ip_address, ip_network, IPv4Address, IPv4Network - -from bundlewrap.exceptions import NoSuchGroup, NoSuchNode, BundleError -from bundlewrap.utils.text import bold, red -from bundlewrap.utils.ui import io - -def resolve_identifier(repo, identifier): - """ - Try to resolve an identifier (group or node). Return a set of ip - addresses valid for this identifier. - """ - try: - nodes = {repo.get_node(identifier)} - except NoSuchNode: - try: - nodes = repo.nodes_in_group(identifier) - except NoSuchGroup: - try: - ip = ip_network(identifier) - - if isinstance(ip, IPv4Network): - return {'ipv4': {ip}, 'ipv6': set()} - else: - return {'ipv4': set(), 'ipv6': {ip}} - except Exception as e: - io.stderr('{x} {t} Exception while resolving "{i}": {e}'.format( - x=red('✘'), - t=bold('libs.tools.resolve_identifier'), - i=identifier, - e=str(e), - )) - raise - - found_ips = set() - for node in nodes: - for interface, config in node.metadata.get('interfaces', {}).items(): - for ip in config.get('ips', set()): - if '/' in ip: - found_ips.add(ip_address(ip.split('/')[0])) - else: - found_ips.add(ip_address(ip)) - - if node.metadata.get('external_ipv4', None): - found_ips.add(ip_address(node.metadata.get('external_ipv4'))) - - ip_dict = { - 'ipv4': set(), - 'ipv6': set(), - } - - for ip in found_ips: - if isinstance(ip, IPv4Address): - ip_dict['ipv4'].add(ip) - else: - ip_dict['ipv6'].add(ip) - - return ip_dict - - -def remove_more_specific_subnets(input_subnets) -> list: - final_subnets = [] - - for subnet in sorted(input_subnets): - source = ip_network(subnet) - - if not source in final_subnets: - subnet_found = False - - for dest_subnet in final_subnets: - if source.subnet_of(dest_subnet): - subnet_found = True - - if not subnet_found: - final_subnets.append(source) - - out = [] - for net in final_subnets: - out.append(str(net)) - - return out - - -def require_bundle(node, bundle, hint=''): - # It's considered bad style to use assert statements outside of tests. - # That's why this little helper function exists, so we have an easy - # way of defining bundle requirements in other bundles. - if not node.has_bundle(bundle): - raise BundleError(f'{node.name} requires bundle {bundle}, but wasn\'t found! {hint}')