#!/usr/bin/env nix-shell #! nix-shell -i python3 -p python3 python3Packages.requests # https://wiki.nixos.org/wiki/Nix-shell_shebang#Python import requests import tomllib import json import argparse import os import base64 import time nixos_infect_script=""" #!/bin/sh curl https://raw.githubusercontent.com/elitak/nixos-infect/master/nixos-infect | NIX_CHANNEL=nixos-24.05 bash """ parser = argparse.ArgumentParser(description='Create a NixOS Vultr Server.') parser.add_argument("label", help="Name for the server") parser.add_argument("--region", default="ord") parser.add_argument("--plan-type", default="vhp") parser.add_argument("--vcpu-count", type=int, default=1) parser.add_argument("--ram", type=int, default=2048) parser.add_argument("--os", default="Debian 12", help="Prefix search") parser.add_argument("--hostname") args = parser.parse_args() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "keys.toml"), "rb") as f: config = tomllib.load(f) headers = { "Authorization": f"Bearer {config["vultr-token"]}", } plans = requests.get("https://api.vultr.com/v2/plans?per_page=500").json()['plans'] plans = [p for p in plans if p['type'] == args.plan_type] plans = [p for p in plans if p['vcpu_count'] == args.vcpu_count] plans = [p for p in plans if p['ram'] == args.ram] plans = [p for p in plans if args.region in p['locations']] plans = [p for p in plans if "amd" in p['id']] # HACK: don't key on ID if len(plans) != 1: print(plans) raise Exception(f"Expected exactly 1 plan; got {len(plans)}") plan = plans[0] print(f"Using plan {plan}") oses = requests.get("https://api.vultr.com/v2/os?per_page=500").json()['os'] oses = [o for o in oses if o['name'].startswith(args.os)] if len(oses) != 1: print(oses) raise Exception(f"Expected exactly 1 OS; got {len(oses)}") chosen_os = oses[0] sshkey_id = requests.get("https://api.vultr.com/v2/ssh-keys", headers=headers).json()['ssh_keys'][0]['id'] data = { "plan": plan['id'], "region": args.region, "label": args.label, "os_id": chosen_os['id'], "backups": "disabled", "hostname": args.hostname or args.label.split('.')[0], "sshkey_id": [sshkey_id], "user_data": base64.b64encode(nixos_infect_script.encode()).decode(), } print(data) if input("Confirm? ")[0] != 'y': raise Exception('aborted') res = requests.post("https://api.vultr.com/v2/instances", headers=headers, data=json.dumps(data)).json() print(res) print("Checking for ipv4...", end="", flush=True) while res['instance']['main_ip'] == '0.0.0.0': print('.', end="", flush=True) res = requests.get(f"https://api.vultr.com/v2/instances/{res['instance']['id']}", headers=headers).json() time.sleep(5) print(res['instance']['main_ip']) # Create DNS record headers = { "Content-Type": "application/json", "Authorization": f"Bearer {config['cloudflare-token']}", } # get zone id zone_name = args.label.split('.',maxsplit=1)[1] zones = requests.get("https://api.cloudflare.com/client/v4/zones", headers=headers).json()['result'] zones = [z for z in zones if z['name'] == zone_name] if len(zones) != 1: print(zones) raise Exception(f"Expected 1 zone; got {len(zones)}") url = f"https://api.cloudflare.com/client/v4/zones/{zones[0]['id']}/dns_records" payload = { "content": res['instance']['main_ip'], "name": args.label, "proxied": False, "type": "A", "id": os.urandom(15).hex(), # not sure why this is required "ttl": 1, # automatic } response = requests.request("POST", url, json=payload, headers=headers) print("Created DNS record!") print(response.text)