sync routeros logins to 1password
This commit is contained in:
parent
a12edcd360
commit
2b873e4cb8
1 changed files with 132 additions and 0 deletions
132
bin/sync_1password
Executable file
132
bin/sync_1password
Executable file
|
|
@ -0,0 +1,132 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from bundlewrap.repo import Repository
|
||||
from os.path import realpath, dirname
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, List
|
||||
|
||||
bw = Repository(dirname(dirname(realpath(__file__))))
|
||||
|
||||
VAULT=bw.vault.decrypt('encrypt$gAAAAABpLgX_xxb5NmNCl3cgHM0JL65GT6PHVXO5gwly7IkmWoEgkCDSuAcSAkNFB8Tb4RdnTdpzVQEUL1XppTKVto_O7_b11GjATiyQYiSfiQ8KZkTKLvk=').value
|
||||
BW_TAG = "bw"
|
||||
BUNDLEWRAP_FIELD_LABEL = "bundlewrap node id"
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpResult:
|
||||
stdout: str
|
||||
stderr: str
|
||||
returncode: int
|
||||
|
||||
|
||||
def main():
|
||||
for node in bw.nodes_in_group('routeros'):
|
||||
upsert_node_item(
|
||||
node_name=node.name,
|
||||
node_uuid=node.metadata.get('id'),
|
||||
username=node.username,
|
||||
password=node.password,
|
||||
url=f'http://{node.hostname}',
|
||||
)
|
||||
|
||||
|
||||
def run_op(args):
|
||||
proc = subprocess.run(
|
||||
["op", "--vault", VAULT] + args,
|
||||
env=os.environ.copy(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise RuntimeError(
|
||||
f"op {' '.join(args)} failed with code {proc.returncode}:\n"
|
||||
f"STDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}"
|
||||
)
|
||||
|
||||
return OpResult(stdout=proc.stdout, stderr=proc.stderr, returncode=proc.returncode)
|
||||
|
||||
|
||||
def op_item_list_bw():
|
||||
out = run_op([
|
||||
"item", "list",
|
||||
"--tags", BW_TAG,
|
||||
"--format", "json",
|
||||
])
|
||||
stdout = out.stdout.strip()
|
||||
return json.loads(stdout) if stdout else []
|
||||
|
||||
|
||||
def op_item_get(item_id):
|
||||
args = ["item", "get", item_id, "--format", "json"]
|
||||
return json.loads(run_op(args).stdout)
|
||||
|
||||
|
||||
def op_item_create(title, node_uuid, username, password, url):
|
||||
print(f"creating {title}")
|
||||
return json.loads(run_op([
|
||||
"item", "create",
|
||||
"--category", "LOGIN",
|
||||
"--title", title,
|
||||
"--tags", BW_TAG,
|
||||
"--url", url,
|
||||
"--format", "json",
|
||||
f"username={username}",
|
||||
f"password={password}",
|
||||
f"{BUNDLEWRAP_FIELD_LABEL}[text]={node_uuid}",
|
||||
]).stdout)
|
||||
|
||||
|
||||
def op_item_edit(item_id, title, username, password, url):
|
||||
print(f"updating {title}")
|
||||
return json.loads(run_op([
|
||||
"item", "edit",
|
||||
item_id,
|
||||
"--title", title,
|
||||
"--url", url,
|
||||
"--format", "json",
|
||||
f"username={username}",
|
||||
f"password={password}",
|
||||
]).stdout)
|
||||
|
||||
|
||||
def find_node_item_id(node_uuid):
|
||||
for summary in op_item_list_bw():
|
||||
item_id = summary.get("id")
|
||||
if not item_id:
|
||||
continue
|
||||
|
||||
item = op_item_get(item_id)
|
||||
for field in item.get("fields") or []:
|
||||
label = field.get("label")
|
||||
value = field.get("value")
|
||||
if label == BUNDLEWRAP_FIELD_LABEL and value == node_uuid:
|
||||
return item_id
|
||||
return None
|
||||
|
||||
|
||||
def upsert_node_item(node_name, node_uuid, username, password, url):
|
||||
if item_id := find_node_item_id(node_uuid):
|
||||
return op_item_edit(
|
||||
item_id=item_id,
|
||||
title=node_name,
|
||||
username=username,
|
||||
password=password,
|
||||
url=url,
|
||||
)
|
||||
else:
|
||||
return op_item_create(
|
||||
title=node_name,
|
||||
node_uuid=node_uuid,
|
||||
username=username,
|
||||
password=password,
|
||||
url=url,
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in a new issue