machine-config/create-server.py

114 lines
3.5 KiB
Python
Executable File

#!/usr/bin/env nix-shell
#! nix-shell -i python3 -p python3 python3Packages.requests
# https://wiki.nixos.org/wiki/Nix-shell_shebang#Python
import requests
import tomllib
import json
import argparse
import os
import base64
import time
nixos_infect_script="""
#!/bin/sh
curl https://raw.githubusercontent.com/elitak/nixos-infect/master/nixos-infect | NIX_CHANNEL=nixos-24.05 bash
"""
parser = argparse.ArgumentParser(description='Create a NixOS Vultr Server.')
parser.add_argument("label", help="Name for the server")
parser.add_argument("--region", default="ord")
parser.add_argument("--plan-type", default="vhp")
parser.add_argument("--vcpu-count", type=int, default=1)
parser.add_argument("--ram", type=int, default=2048)
parser.add_argument("--os", default="Debian 12", help="Prefix search")
parser.add_argument("--hostname")
args = parser.parse_args()
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "keys.toml"), "rb") as f:
config = tomllib.load(f)
headers = {
"Authorization": f"Bearer {config["vultr-token"]}",
}
plans = requests.get("https://api.vultr.com/v2/plans?per_page=500").json()['plans']
plans = [p for p in plans if p['type'] == args.plan_type]
plans = [p for p in plans if p['vcpu_count'] == args.vcpu_count]
plans = [p for p in plans if p['ram'] == args.ram]
plans = [p for p in plans if args.region in p['locations']]
plans = [p for p in plans if "amd" in p['id']] # HACK: don't key on ID
if len(plans) != 1:
print(plans)
raise Exception(f"Expected exactly 1 plan; got {len(plans)}")
plan = plans[0]
print(f"Using plan {plan}")
oses = requests.get("https://api.vultr.com/v2/os?per_page=500").json()['os']
oses = [o for o in oses if o['name'].startswith(args.os)]
if len(oses) != 1:
print(oses)
raise Exception(f"Expected exactly 1 OS; got {len(oses)}")
chosen_os = oses[0]
sshkey_id = requests.get("https://api.vultr.com/v2/ssh-keys", headers=headers).json()['ssh_keys'][0]['id']
data = {
"plan": plan['id'],
"region": args.region,
"label": args.label,
"os_id": chosen_os['id'],
"backups": "disabled",
"hostname": args.hostname or args.label.split('.')[0],
"sshkey_id": [sshkey_id],
"user_data": base64.b64encode(nixos_infect_script.encode()).decode(),
}
print(data)
if input("Confirm? ")[0] != 'y':
raise Exception('aborted')
res = requests.post("https://api.vultr.com/v2/instances", headers=headers, data=json.dumps(data)).json()
print(res)
print("Checking for ipv4...", end="", flush=True)
while res['instance']['main_ip'] == '0.0.0.0':
print('.', end="", flush=True)
res = requests.get(f"https://api.vultr.com/v2/instances/{res['instance']['id']}", headers=headers).json()
time.sleep(5)
print(res['instance']['main_ip'])
# Create DNS record
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {config['cloudflare-token']}",
}
# get zone id
zone_name = args.label.split('.',maxsplit=1)[1]
zones = requests.get("https://api.cloudflare.com/client/v4/zones", headers=headers).json()['result']
zones = [z for z in zones if z['name'] == zone_name]
if len(zones) != 1:
print(zones)
raise Exception(f"Expected 1 zone; got {len(zones)}")
url = f"https://api.cloudflare.com/client/v4/zones/{zones[0]['id']}/dns_records"
payload = {
"content": res['instance']['main_ip'],
"name": args.label,
"proxied": False,
"type": "A",
"id": os.urandom(15).hex(), # not sure why this is required
"ttl": 1, # automatic
}
response = requests.request("POST", url, json=payload, headers=headers)
print("Created DNS record!")
print(response.text)