Adam Pippin
4 years ago
commit
eb02bc55b6
4 changed files with 172 additions and 0 deletions
@ -0,0 +1,7 @@ |
|||
import logging |
|||
|
|||
__author__ = "Adam Pippin" |
|||
__email__ = "hello@adampippin.ca" |
|||
__version__ = "0.0.1" |
|||
|
|||
logging.getLogger("mass_s3").addHandler(logging.NullHandler()) |
@ -0,0 +1,15 @@ |
|||
import logging |
|||
|
|||
if __name__ == "__main__": |
|||
from .cli import cli |
|||
# Set up logging |
|||
logger = logging.getLogger("mass_s3") |
|||
handler = logging.StreamHandler() |
|||
formatter = logging.Formatter( |
|||
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s') |
|||
handler.setFormatter(formatter) |
|||
logger.addHandler(handler) |
|||
logger.setLevel(logging.DEBUG) |
|||
|
|||
cli() |
|||
|
@ -0,0 +1,58 @@ |
|||
import click |
|||
import logging |
|||
import datetime |
|||
from pprint import pprint |
|||
|
|||
from . import __version__ |
|||
from .mass_s3 import MassS3 |
|||
|
|||
@click.group() |
|||
def cli(): |
|||
""" |
|||
Tool for running an operation on a bunch of objects in S3 a little quicker |
|||
""" |
|||
pass |
|||
|
|||
@cli.command() |
|||
@click.argument('bucket') |
|||
@click.argument('path') |
|||
@click.argument('new_acl') |
|||
@click.option('-m', '--modified-since', 'modified_since', default=None, required=False) |
|||
@click.option('-o', '--owner', 'owner', default=None, required=False) |
|||
def put_object_acl(bucket, path, new_acl, modified_since, owner): |
|||
"""Find all files in BUCKET prefixed with PATH and apply NEW_ACL. |
|||
""" |
|||
logger = logging.getLogger("mass_s3") |
|||
logger.info("Starting") |
|||
|
|||
if modified_since is not None: |
|||
modified_since = modified_since.split('-') |
|||
if len(modified_since) != 3: |
|||
raise Exception('Could not parse modified_since') |
|||
modified_since = datetime.datetime(int(modified_since[0]), int(modified_since[1]), int(modified_since[2]), tzinfo=datetime.timezone.utc) |
|||
|
|||
if path[0:1] == "/": |
|||
path = path[1:] |
|||
|
|||
filters = {} |
|||
if modified_since is not None: |
|||
filters['modified_since'] = modified_since |
|||
if owner is not None: |
|||
filters['owner'] = owner |
|||
|
|||
|
|||
ms3 = MassS3() |
|||
ms3.put_object_acl(bucket, path, new_acl, filters) |
|||
|
|||
logger.info("Done") |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
@cli.command() |
|||
def version(): |
|||
"""Display the program version |
|||
""" |
|||
print(__version__) |
|||
|
@ -0,0 +1,92 @@ |
|||
import boto3 |
|||
import os |
|||
import logging |
|||
from pprint import pprint |
|||
|
|||
class MassS3: |
|||
|
|||
def __init__(self): |
|||
self.s3 = boto3.client('s3') |
|||
self.logger = logging.getLogger("mass_s3") |
|||
|
|||
def put_object_acl(self, bucket, path, new_acl, filters): |
|||
objects = self._list_objects(bucket, path, **filters) |
|||
self._fork(self._put_object_acl, objects, { "bucket": bucket, "new_acl": new_acl }) |
|||
|
|||
def _put_object_acl(self, objects, bucket, new_acl): |
|||
self.logger.info("_put_object_acl on " + str(len(objects)) + " objects in " + bucket + " setting " + new_acl) |
|||
for object in objects: |
|||
self.s3.put_object_acl( |
|||
ACL = new_acl, |
|||
Bucket = bucket, |
|||
Key = object |
|||
); |
|||
self.logger.debug("PutObjectAcl(ACL=" + new_acl + ", Bucket=" + bucket + ", Key=" + object + ")") |
|||
|
|||
def _fork(self, callback, objects, args = {}, count=4): |
|||
self.logger.info("Forking " + str(count) + " processes to process " + str(len(objects)) + " objects.") |
|||
|
|||
if len(objects) == 0: |
|||
self.logger.warning("No objects matched. Doing nothing.") |
|||
return |
|||
|
|||
if len(objects) < count: |
|||
count = len(objects) |
|||
self.logger.debug("Actually, limiting to " + str(count) + " processes (one per object).") |
|||
|
|||
|
|||
per_process = int(len(objects) / count) |
|||
children = [] |
|||
|
|||
for i in range(0, count): |
|||
if i == count - 1: |
|||
# Last worker, give it any leftovers too |
|||
my_objects = objects[i * per_process:] |
|||
else: |
|||
my_objects = objects[i * per_process:(i * per_process) + per_process] |
|||
|
|||
pid = os.fork() |
|||
|
|||
if pid == 0: |
|||
# Do child stuff |
|||
callback(my_objects, **args) |
|||
exit(0) |
|||
else: |
|||
children.append(pid) |
|||
self.logger.info("Forked child #" + str(i) + " as pid " + str(pid)) |
|||
|
|||
for i in range(0, len(children)): |
|||
os.waitpid(children[i], 0) |
|||
|
|||
def _list_objects(self, bucket, path, modified_since=None, owner=None): |
|||
objects = [] |
|||
|
|||
args = { |
|||
'Bucket': bucket, |
|||
'Prefix': path |
|||
} |
|||
|
|||
if owner is not None: |
|||
args['FetchOwner'] = True |
|||
|
|||
while True: |
|||
list_objects_response = self.s3.list_objects_v2(**args) |
|||
|
|||
if 'Contents' not in list_objects_response: |
|||
break |
|||
|
|||
for item in list_objects_response['Contents']: |
|||
if modified_since is not None and item['LastModified'] < modified_since: |
|||
continue |
|||
if owner is not None and item['Owner']['ID'] != owner: |
|||
continue |
|||
objects.append(item['Key']) |
|||
|
|||
if 'NextContinuationToken' in list_objects_response: |
|||
args['ContinuationToken'] = list_objects_response['NextContinuationToken'] |
|||
else: |
|||
break |
|||
|
|||
return objects |
|||
|
|||
|
Loading…
Reference in new issue