#!/usr/bin/env python3 from __future__ import annotations import argparse import sys from os import path from pathlib import Path import os import time from typing import Any, Callable, DefaultDict, Iterable, Mapping from jinja2 import Template import itertools import subprocess as sub import shutil import string import secrets TEMPLATE_DIR = "./templates" def gen_pass() -> str: alphabet = string.ascii_letters + string.digits password = "".join(secrets.choice(alphabet) for _ in range(20)) return password def check_positive(value: str): ivalue = int(value) if ivalue <= 0: raise Exception("Supplied number must be >= 0") return ivalue def encode_member(member: str, mapping: Mapping[str, Any]) -> str: return member + " " + " ".join([f"{k}={v}" for k, v in mapping.items()]) def iter_ips(ip_format: str, start_octet: int): ip_int = start_octet ip_fmt = ip_format while ip_int < 255: yield ip_fmt.format(ip_int) ip_int += 1 class InventoryWriter: def __init__(self, location: str) -> None: self._file_handle = Path(location) self._groups: dict[str, set[str]] = DefaultDict(set) def add(self, name: str, members: Iterable[str]): self._groups[name] |= set(members) def _build_group(self, name: str, members: set[str]): fmt = f"[{name}]\n" + "\n".join(members) return fmt def flush(self): txt = "" for name, members in self._groups.items(): txt += self._build_group(name, members) + "\n\n" self._file_handle.write_text(txt, encoding="utf8") def copy_template(src: str, dest: str, mapping: Mapping[str, Any] = {}): c = Path(src).read_text() t: Template = Template(c) r = t.render(mapping) Path(dest).write_text(r) def list_envs(args: argparse.Namespace): try: customer_path = path.join("customers", args.customer_name, "envs") print(" ".join(os.listdir(customer_path))) except FileNotFoundError: raise Exception(f"Customer `{args.customer_name}` does not exist.") def delete_env(args: argparse.Namespace): for env in args.env_names: env_path = path.join("customers", args.customer_name, "envs", env) sub.call(["vagrant", "destroy", "-f"], cwd=env_path) shutil.rmtree(env_path) print(f"Deleted `{env}` from customer `{ args.customer_name}`") def create_env(args: argparse.Namespace): if (args.num_nginx_web + args.num_nginx_lb + args.num_postgres) == 0: raise Exception("At least one item should be deployed") env_path = path.join("customers", args.customer_name, "envs", args.env_name) Path(env_path).mkdir(exist_ok=True, parents=True) # Template `ansible.cfg` src = path.join(TEMPLATE_DIR, "ansible.cfg.template") dest = path.join(env_path, "ansible.cfg") copy_template(src=src, dest=dest) # Create inventory file inv_path = path.join(env_path, "inventory") iw = InventoryWriter(inv_path) ip_generator = iter_ips(args.ip_format, args.ip_int) web_ips = itertools.islice(ip_generator, args.num_nginx_web) iw.add("webserver", web_ips) lb_ips = itertools.islice(ip_generator, args.num_nginx_lb) iw.add("loadbalancer", lb_ips) psql_gen_pass: Callable[[str], str] = lambda x: encode_member( x, {"psql_pass": gen_pass()} ) psql_ips = list(itertools.islice(ip_generator, args.num_postgres)) psql_ips = map(psql_gen_pass, psql_ips) iw.add("postgresql", psql_ips) iw.flush() # Template `Vagrantfile` src = path.join(TEMPLATE_DIR, "Vagrantfile.template") dest = path.join(env_path, "Vagrantfile") mapping = { "env": args.env_name, "customer_name": args.customer_name, "ip_int": args.ip_int, "ip_format": args.ip_format.replace("{}", "%d"), "num_webserver": args.num_nginx_web, "num_loadbalancers": args.num_nginx_lb, "num_postgres": args.num_postgres, } copy_template(src=src, dest=dest, mapping=mapping) # Generate .ssh ssh_dir = path.join(env_path, ".ssh") Path(ssh_dir).mkdir(exist_ok=True) ssh_key_cmd = [ "ssh-keygen", "-t", "rsa", "-b", "2048", "-f", path.join(ssh_dir, "id_rsa"), ] sub.call(ssh_key_cmd) # Provision and configure machines sub.call(["vagrant", "up"], cwd=env_path) time.sleep(1) sub.call(["ansible-playbook", "../../../../site.yml"], cwd=env_path) def main() -> int: parser = argparse.ArgumentParser() sub_parser = parser.add_subparsers() list_parser = sub_parser.add_parser("list", help="list customer-owned environments") list_parser.add_argument("customer_name", type=str, help="name of the customer") list_parser.set_defaults(func=list_envs) cenv_parser = sub_parser.add_parser("create", help="create a new environment") cenv_parser.add_argument("customer_name", type=str, help="name of the customer") cenv_parser.add_argument("env_name", type=str, help="name of the environment") cenv_parser.add_argument( "--num-postgres", type=check_positive, help="number of postgres databases", default=0, ) cenv_parser.add_argument( "--num-nginx-web", type=check_positive, help="number of nginx webservers", default=0, ) cenv_parser.add_argument( "--num-nginx-lb", type=check_positive, help="number of nginx loadbalancers", default=0, ) cenv_parser.add_argument( "--ip-format", type=str, help="format of ip", default="192.168.56.{}" ) cenv_parser.add_argument( "--ip-int", type=check_positive, help="4th octet to start at", default=10 ) cenv_parser.set_defaults(func=create_env) denv_parser = sub_parser.add_parser("delete", help="delete an environment") denv_parser.add_argument("customer_name", type=str, help="name of the customer") denv_parser.add_argument( "env_names", type=str, nargs="+", help="name of one or more environments" ) denv_parser.set_defaults(func=delete_env) args = parser.parse_args(sys.argv[1:]) args.func(args) return 0 if __name__ == "__main__": raise SystemExit(main())