From eb02bc55b67dd89370be3a7a9a983724ede2acbb Mon Sep 17 00:00:00 2001 From: Adam Pippin Date: Tue, 31 Mar 2020 13:39:12 -0700 Subject: [PATCH] Initial commit --- mass_s3/__init__.py | 7 ++++ mass_s3/__main__.py | 15 ++++++++ mass_s3/cli.py | 58 ++++++++++++++++++++++++++++ mass_s3/mass_s3.py | 92 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 172 insertions(+) create mode 100644 mass_s3/__init__.py create mode 100644 mass_s3/__main__.py create mode 100644 mass_s3/cli.py create mode 100644 mass_s3/mass_s3.py diff --git a/mass_s3/__init__.py b/mass_s3/__init__.py new file mode 100644 index 0000000..70fa15b --- /dev/null +++ b/mass_s3/__init__.py @@ -0,0 +1,7 @@ +import logging + +__author__ = "Adam Pippin" +__email__ = "hello@adampippin.ca" +__version__ = "0.0.1" + +logging.getLogger("mass_s3").addHandler(logging.NullHandler()) diff --git a/mass_s3/__main__.py b/mass_s3/__main__.py new file mode 100644 index 0000000..1e5c4e4 --- /dev/null +++ b/mass_s3/__main__.py @@ -0,0 +1,15 @@ +import logging + +if __name__ == "__main__": + from .cli import cli + # Set up logging + logger = logging.getLogger("mass_s3") + handler = logging.StreamHandler() + formatter = logging.Formatter( + '%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + cli() + diff --git a/mass_s3/cli.py b/mass_s3/cli.py new file mode 100644 index 0000000..5c75763 --- /dev/null +++ b/mass_s3/cli.py @@ -0,0 +1,58 @@ +import click +import logging +import datetime +from pprint import pprint + +from . import __version__ +from .mass_s3 import MassS3 + +@click.group() +def cli(): + """ + Tool for running an operation on a bunch of objects in S3 a little quicker + """ + pass + +@cli.command() +@click.argument('bucket') +@click.argument('path') +@click.argument('new_acl') +@click.option('-m', '--modified-since', 'modified_since', default=None, required=False) +@click.option('-o', '--owner', 'owner', default=None, required=False) +def put_object_acl(bucket, path, new_acl, modified_since, owner): + """Find all files in BUCKET prefixed with PATH and apply NEW_ACL. + """ + logger = logging.getLogger("mass_s3") + logger.info("Starting") + + if modified_since is not None: + modified_since = modified_since.split('-') + if len(modified_since) != 3: + raise Exception('Could not parse modified_since') + modified_since = datetime.datetime(int(modified_since[0]), int(modified_since[1]), int(modified_since[2]), tzinfo=datetime.timezone.utc) + + if path[0:1] == "/": + path = path[1:] + + filters = {} + if modified_since is not None: + filters['modified_since'] = modified_since + if owner is not None: + filters['owner'] = owner + + + ms3 = MassS3() + ms3.put_object_acl(bucket, path, new_acl, filters) + + logger.info("Done") + + + + + +@cli.command() +def version(): + """Display the program version + """ + print(__version__) + diff --git a/mass_s3/mass_s3.py b/mass_s3/mass_s3.py new file mode 100644 index 0000000..b2112a2 --- /dev/null +++ b/mass_s3/mass_s3.py @@ -0,0 +1,92 @@ +import boto3 +import os +import logging +from pprint import pprint + +class MassS3: + + def __init__(self): + self.s3 = boto3.client('s3') + self.logger = logging.getLogger("mass_s3") + + def put_object_acl(self, bucket, path, new_acl, filters): + objects = self._list_objects(bucket, path, **filters) + self._fork(self._put_object_acl, objects, { "bucket": bucket, "new_acl": new_acl }) + + def _put_object_acl(self, objects, bucket, new_acl): + self.logger.info("_put_object_acl on " + str(len(objects)) + " objects in " + bucket + " setting " + new_acl) + for object in objects: + self.s3.put_object_acl( + ACL = new_acl, + Bucket = bucket, + Key = object + ); + self.logger.debug("PutObjectAcl(ACL=" + new_acl + ", Bucket=" + bucket + ", Key=" + object + ")") + + def _fork(self, callback, objects, args = {}, count=4): + self.logger.info("Forking " + str(count) + " processes to process " + str(len(objects)) + " objects.") + + if len(objects) == 0: + self.logger.warning("No objects matched. Doing nothing.") + return + + if len(objects) < count: + count = len(objects) + self.logger.debug("Actually, limiting to " + str(count) + " processes (one per object).") + + + per_process = int(len(objects) / count) + children = [] + + for i in range(0, count): + if i == count - 1: + # Last worker, give it any leftovers too + my_objects = objects[i * per_process:] + else: + my_objects = objects[i * per_process:(i * per_process) + per_process] + + pid = os.fork() + + if pid == 0: + # Do child stuff + callback(my_objects, **args) + exit(0) + else: + children.append(pid) + self.logger.info("Forked child #" + str(i) + " as pid " + str(pid)) + + for i in range(0, len(children)): + os.waitpid(children[i], 0) + + def _list_objects(self, bucket, path, modified_since=None, owner=None): + objects = [] + + args = { + 'Bucket': bucket, + 'Prefix': path + } + + if owner is not None: + args['FetchOwner'] = True + + while True: + list_objects_response = self.s3.list_objects_v2(**args) + + if 'Contents' not in list_objects_response: + break + + for item in list_objects_response['Contents']: + if modified_since is not None and item['LastModified'] < modified_since: + continue + if owner is not None and item['Owner']['ID'] != owner: + continue + objects.append(item['Key']) + + if 'NextContinuationToken' in list_objects_response: + args['ContinuationToken'] = list_objects_response['NextContinuationToken'] + else: + break + + return objects + +