This commit is contained in:
mwiegand 2021-07-13 15:28:45 +02:00
parent 0676868e51
commit 9b025bdd6b
11 changed files with 0 additions and 404 deletions

View file

@ -1,70 +0,0 @@
# https://manpages.debian.org/jessie/apt/sources.list.5.de.html
from urllib.parse import urlparse
from re import search, sub
from functools import total_ordering
@total_ordering
class AptSource():
def __init__(self, string):
if search(r'\[.*\]', string):
self.options = {
k:v.split(',') for k,v in (
e.split('=') for e in search(r'\[(.*)\]', string)[1].split()
)
}
else:
self.options = {}
parts = sub(r'\[.*\]', '', string).split()
self.type = parts[0]
self.url = urlparse(parts[1])
self.suite = parts[2]
self.components = parts[3:]
def __str__(self):
parts = [
self.type,
self.url.geturl(),
self.suite,
' '.join(self.components),
]
if self.options:
parts.insert(
1,
"[{}]".format(
' '.join(
'{}={}'.format(
k,
','.join(v)
) for k,v in self.options.items()
)
)
)
return ' '.join(parts)
def __eq__(self, other):
return str(self) == str(other)
def __lt__(self, other):
return str(self) < str(other)
def __hash__(self):
return hash(str(self))
def __repr__(self):
return f"{type(self).__name__}('{str(self)}')"
# source = AptSource('deb [arch=amd64 trusted=true] http://deb.debian.org/debian buster-backports main contrib non-free')
# print(repr(source))
# print(source.type)
# print(source.options)
# source.options['test'] = ['was', 'ist', 'das']
# print(source.url)
# print(source.suite)
# print(source.components)
# print(str(source))

View file

@ -1,71 +0,0 @@
#!/usr/bin/env python
from hashlib import sha3_256
from itertools import count, islice
from Crypto.Cipher import ChaCha20
from math import floor, ceil
from time import sleep
from os import environ
from sys import stderr
def debug(*args):
if 'DEBUG' in environ:
print(*args, file=stderr)
def chacha_bits(input, bit_count):
zerobyte = (0).to_bytes(length=1, byteorder='big')
cipher = ChaCha20.new(key=sha3_256(input).digest(), nonce=zerobyte*8)
i = 0
while True:
debug(f'--- BITS {i} ---')
start_bit = bit_count * i
start_byte = start_bit // 8
start_padding = start_bit % 8
debug('start_bit', start_bit)
debug('start_byte', start_byte)
debug('start_padding', start_padding)
end_bit = bit_count * i + bit_count
end_byte = end_bit // 8
end_padding = 8 - (end_bit % 8)
debug('end_bit', end_bit)
debug('end_byte', end_byte)
debug('end_padding', end_padding)
byte_count = (end_byte - start_byte) + 1
debug('byte_count', byte_count)
cipher.seek(start_byte)
cipherint = int.from_bytes(cipher.encrypt(zerobyte*byte_count), byteorder='big')
debug('ciphertext', bin(cipherint))
shifted_cipherint = cipherint >> end_padding
debug('shifted_ciphertext', bin(shifted_cipherint))
bit_mask = int('1'*bit_count, 2)
debug('bit_mask', bin(bit_mask))
masked_cipherint = shifted_cipherint & bit_mask
debug('masked_ciphertext', bin(masked_cipherint))
debug('')
yield masked_cipherint
i += 1
def chacha_chracter(input, choices):
get_bits = chacha_bits(input, len(choices).bit_length())
while True:
key = next(get_bits)
if key < len(choices):
yield choices[key]
def derive_string(input, length, choices):
sorted_choices = bytes(sorted(choices))
get_character = chacha_chracter(input, sorted_choices)
return bytes(islice(get_character, length))
# print(
# derive_string(b'12344', length=100, choices=b'abcdefghijklmnopqrstuvwxyz0123456789')
# )

View file

@ -1,23 +0,0 @@
from ipaddress import ip_interface
def get_a_records(metadata, internal=True, external=True):
networks = metadata.get('network')
if not internal:
networks.pop('internal', None)
if not external:
networks.pop('external', None)
return {
'A': [
str(ip_interface(network['ipv4']).ip)
for network in networks.values()
if 'ipv4' in network
],
'AAAA': [
str(ip_interface(network['ipv6']).ip)
for network in networks.values()
if 'ipv6' in network
],
}

View file

@ -1,29 +0,0 @@
from mako.template import Template
from copy import deepcopy
def generate_flux(bucket, host, field, data):
return Template(flux_template).render(
bucket=bucket,
host=host,
field=field,
data=data
).strip()
def generate_panel(bucket, host, title, targets, min=None, max=None):
panel = deepcopy(panel_template)
panel['title'] = title
if min:
panel['fieldConfig']['defaults']['min'] = min
if max:
panel['fieldConfig']['defaults']['max'] = max
panel['targets'] = [
{
'hide': False,
'refId': field,
'query': generate_flux(bucket, host, field, data),
} for field, data in targets.items()
]

View file

@ -1,24 +0,0 @@
from configparser import ConfigParser
def parse(text):
config = ConfigParser()
config.read_string(text)
return {
section: dict(config.items(section))
for section in config.sections()
}
class Writable():
data = ''
def write(self, line):
self.data += line
def dumps(dict):
config = ConfigParser()
config.read_dict(dict)
writable = Writable()
config.write(writable)
return writable.data

View file

@ -1,15 +0,0 @@
import base64
from nacl.public import PrivateKey
from nacl.encoding import Base64Encoder
from bundlewrap.utils import Fault
def gen_privkey(repo, identifier):
return repo.vault.random_bytes_as_base64_for(identifier)
def get_pubkey_from_privkey(identifier, privkey):
# FIXME this assumes the privkey is always a base64 encoded string
def derive_pubkey():
pub_key = PrivateKey(base64.b64decode(str(privkey))).public_key
return pub_key.encode(encoder=Base64Encoder).decode('ascii')
return Fault(f'pubkey from privkey {identifier}', derive_pubkey)

View file

@ -1,2 +0,0 @@
def occ(command, *args, **kwargs):
return f"""sudo -u www-data php /opt/nextcloud/occ {command} {' '.join(args)} {' '.join(f'--{name.replace("_", "-")}' + (f'={value}' if value else '') for name, value in kwargs.items())}"""

View file

@ -1,27 +0,0 @@
def render_config(config):
return '\n'.join(render_lines(config))
def render_lines(config, indent=0):
lines = []
blocks = []
for key, value in sorted(config.items()):
if isinstance(value, dict):
blocks.extend([
'',
key+' {',
*render_lines(value, indent=4),
'}',
])
elif isinstance(value, list):
lines.extend([
f'{key} {_value};' for _value in value
])
else:
lines.append(
f'{key} {value};'
)
return [
f"{' '*indent}{line}" for line in lines+blocks
]

View file

@ -1,28 +0,0 @@
from base64 import b64decode, b64encode
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
def generate_ed25519_key_pair(secret):
privkey_bytes = Ed25519PrivateKey.from_private_bytes(secret)
nondeterministic_privatekey = privkey_bytes.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.OpenSSH,
encryption_algorithm=serialization.NoEncryption()
).decode()
nondeterministic_bytes = b64decode(''.join(nondeterministic_privatekey.split('\n')[1:-2]))
# handle random 32bit number, occuring twice in a row
deterministic_bytes = nondeterministic_bytes[:98] + b'00000000' + nondeterministic_bytes[106:]
deterministic_privatekey = '\n'.join([
'-----BEGIN OPENSSH PRIVATE KEY-----',
b64encode(deterministic_bytes).decode(),
'-----END OPENSSH PRIVATE KEY-----',
])
public_key = privkey_bytes.public_key().public_bytes(
encoding=serialization.Encoding.OpenSSH,
format=serialization.PublicFormat.OpenSSH,
).decode()
return (deterministic_privatekey, public_key)

View file

@ -1,27 +0,0 @@
from mako.template import Template
template = '''
% for segment, options in data.items():
% if '#' in segment:
# ${segment.split('#', 2)[1]}
% endif
[${segment.split('#')[0]}]
% for option, value in options.items():
% if isinstance(value, dict):
% for k, v in value.items():
${option}=${k}=${v}
% endfor
% elif isinstance(value, (list, set, tuple)):
% for item in sorted(value):
${option}=${item}
% endfor
% else:
${option}=${str(value)}
% endif
% endfor
% endfor
'''
def generate_unitfile(data):
return Template(template).render(data=data).lstrip()

View file

@ -1,88 +0,0 @@
from ipaddress import ip_address, ip_network, IPv4Address, IPv4Network
from bundlewrap.exceptions import NoSuchGroup, NoSuchNode, BundleError
from bundlewrap.utils.text import bold, red
from bundlewrap.utils.ui import io
def resolve_identifier(repo, identifier):
"""
Try to resolve an identifier (group or node). Return a set of ip
addresses valid for this identifier.
"""
try:
nodes = {repo.get_node(identifier)}
except NoSuchNode:
try:
nodes = repo.nodes_in_group(identifier)
except NoSuchGroup:
try:
ip = ip_network(identifier)
if isinstance(ip, IPv4Network):
return {'ipv4': {ip}, 'ipv6': set()}
else:
return {'ipv4': set(), 'ipv6': {ip}}
except Exception as e:
io.stderr('{x} {t} Exception while resolving "{i}": {e}'.format(
x=red(''),
t=bold('libs.tools.resolve_identifier'),
i=identifier,
e=str(e),
))
raise
found_ips = set()
for node in nodes:
for interface, config in node.metadata.get('interfaces', {}).items():
for ip in config.get('ips', set()):
if '/' in ip:
found_ips.add(ip_address(ip.split('/')[0]))
else:
found_ips.add(ip_address(ip))
if node.metadata.get('external_ipv4', None):
found_ips.add(ip_address(node.metadata.get('external_ipv4')))
ip_dict = {
'ipv4': set(),
'ipv6': set(),
}
for ip in found_ips:
if isinstance(ip, IPv4Address):
ip_dict['ipv4'].add(ip)
else:
ip_dict['ipv6'].add(ip)
return ip_dict
def remove_more_specific_subnets(input_subnets) -> list:
final_subnets = []
for subnet in sorted(input_subnets):
source = ip_network(subnet)
if not source in final_subnets:
subnet_found = False
for dest_subnet in final_subnets:
if source.subnet_of(dest_subnet):
subnet_found = True
if not subnet_found:
final_subnets.append(source)
out = []
for net in final_subnets:
out.append(str(net))
return out
def require_bundle(node, bundle, hint=''):
# It's considered bad style to use assert statements outside of tests.
# That's why this little helper function exists, so we have an easy
# way of defining bundle requirements in other bundles.
if not node.has_bundle(bundle):
raise BundleError(f'{node.name} requires bundle {bundle}, but wasn\'t found! {hint}')