import boto3 import os import logging from pprint import pprint class MassS3: def __init__(self): self.s3 = boto3.client('s3') self.logger = logging.getLogger("mass_s3") def put_object_acl(self, bucket, path, new_acl, filters): objects = self._list_objects(bucket, path, **filters) self._fork(self._put_object_acl, objects, { "bucket": bucket, "new_acl": new_acl }) def _put_object_acl(self, objects, bucket, new_acl): self.logger.info("_put_object_acl on " + str(len(objects)) + " objects in " + bucket + " setting " + new_acl) for object in objects: self.s3.put_object_acl( ACL = new_acl, Bucket = bucket, Key = object ); self.logger.debug("PutObjectAcl(ACL=" + new_acl + ", Bucket=" + bucket + ", Key=" + object + ")") def _fork(self, callback, objects, args = {}, count=4): self.logger.info("Forking " + str(count) + " processes to process " + str(len(objects)) + " objects.") if len(objects) == 0: self.logger.warning("No objects matched. Doing nothing.") return if len(objects) < count: count = len(objects) self.logger.debug("Actually, limiting to " + str(count) + " processes (one per object).") per_process = int(len(objects) / count) children = [] for i in range(0, count): if i == count - 1: # Last worker, give it any leftovers too my_objects = objects[i * per_process:] else: my_objects = objects[i * per_process:(i * per_process) + per_process] pid = os.fork() if pid == 0: # Do child stuff callback(my_objects, **args) exit(0) else: children.append(pid) self.logger.info("Forked child #" + str(i) + " as pid " + str(pid)) for i in range(0, len(children)): os.waitpid(children[i], 0) def _list_objects(self, bucket, path, modified_since=None, owner=None): objects = [] args = { 'Bucket': bucket, 'Prefix': path } if owner is not None: args['FetchOwner'] = True total = 0 while True: list_objects_response = self.s3.list_objects_v2(**args) if 'Contents' not in list_objects_response: break for item in list_objects_response['Contents']: if modified_since is not None and item['LastModified'] < modified_since: continue if owner is not None and item['Owner']['DisplayName'] != owner: continue objects.append(item['Key']) total = total + len(list_objects_response['Contents']) self.logger.debug('Got ' + str(total) + ' objects; kept ' + str(len(objects))) if 'NextContinuationToken' in list_objects_response: args['ContinuationToken'] = list_objects_response['NextContinuationToken'] else: break return objects