ansible/test/integration/cleanup_gce.py

91 lines
2.6 KiB
Python
Raw Normal View History

2014-03-30 00:27:48 +00:00
'''
Find and delete GCE resources matching the provided --match string. Unless
--yes|-y is provided, the prompt for confirmation prior to deleting resources.
Please use caution, you can easily delete your *ENTIRE* GCE infrastructure.
'''
import optparse
2014-03-30 00:27:48 +00:00
import os
import re
import sys
import yaml
try:
from libcloud.common.google import (
GoogleBaseError,
QuotaExceededError,
ResourceExistsError,
ResourceInUseError,
ResourceNotFoundError,
)
2014-03-30 00:27:48 +00:00
from libcloud.compute.providers import get_driver
from libcloud.compute.types import Provider
2014-03-30 00:27:48 +00:00
_ = Provider.GCE
except ImportError:
print("failed=True msg='libcloud with GCE support (0.13.3+) required for this module'")
2014-03-30 00:27:48 +00:00
sys.exit(1)
import gce_credentials
2014-03-30 00:27:48 +00:00
def delete_gce_resources(get_func, attr, opts):
for item in get_func():
val = getattr(item, attr)
if re.search(opts.match_re, val, re.IGNORECASE):
prompt_and_delete(item, "Delete matching %s? [y/n]: " % (item,), opts.assumeyes)
2014-03-30 00:27:48 +00:00
def prompt_and_delete(item, prompt, assumeyes):
if not assumeyes:
assumeyes = raw_input(prompt).lower() == 'y'
assert hasattr(item, 'destroy'), "Class <%s> has no delete attribute" % item.__class__
if assumeyes:
item.destroy()
print("Deleted %s" % item)
2014-03-30 00:27:48 +00:00
def parse_args():
parser = optparse.OptionParser(
usage="%s [options]" % sys.argv[0],
description=__doc__
)
gce_credentials.add_credentials_options(parser)
parser.add_option(
"--yes", "-y",
2014-03-30 00:27:48 +00:00
action="store_true", dest="assumeyes",
default=False,
help="Don't prompt for confirmation"
)
parser.add_option(
"--match",
2014-03-30 00:27:48 +00:00
action="store", dest="match_re",
default="^ansible-testing-",
help="Regular expression used to find GCE resources (default: %default)"
)
2014-03-30 00:27:48 +00:00
(opts, args) = parser.parse_args()
gce_credentials.check_required(opts, parser)
2014-03-30 00:27:48 +00:00
return (opts, args)
if __name__ == '__main__':
(opts, args) = parse_args()
# Connect to GCE
gce = gce_credentials.get_gce_driver(opts)
2014-03-30 00:27:48 +00:00
try:
2017-01-30 23:01:47 +00:00
# Delete matching instances
delete_gce_resources(gce.list_nodes, 'name', opts)
2017-01-30 23:01:47 +00:00
# Delete matching snapshots
def get_snapshots():
for volume in gce.list_volumes():
for snapshot in gce.list_volume_snapshots(volume):
yield snapshot
delete_gce_resources(get_snapshots, 'name', opts)
# Delete matching disks
delete_gce_resources(gce.list_volumes, 'name', opts)
except KeyboardInterrupt as e:
2015-08-27 19:02:35 +00:00
print("\nExiting on user command.")