You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
96 lines
3.2 KiB
96 lines
3.2 KiB
import boto3
|
|
import os
|
|
import logging
|
|
from pprint import pprint
|
|
|
|
class MassS3:
|
|
|
|
def __init__(self):
|
|
self.s3 = boto3.client('s3')
|
|
self.logger = logging.getLogger("mass_s3")
|
|
|
|
def put_object_acl(self, bucket, path, new_acl, filters):
|
|
objects = self._list_objects(bucket, path, **filters)
|
|
self._fork(self._put_object_acl, objects, { "bucket": bucket, "new_acl": new_acl })
|
|
|
|
def _put_object_acl(self, objects, bucket, new_acl):
|
|
self.logger.info("_put_object_acl on " + str(len(objects)) + " objects in " + bucket + " setting " + new_acl)
|
|
for object in objects:
|
|
self.s3.put_object_acl(
|
|
ACL = new_acl,
|
|
Bucket = bucket,
|
|
Key = object
|
|
);
|
|
self.logger.debug("PutObjectAcl(ACL=" + new_acl + ", Bucket=" + bucket + ", Key=" + object + ")")
|
|
|
|
def _fork(self, callback, objects, args = {}, count=4):
|
|
self.logger.info("Forking " + str(count) + " processes to process " + str(len(objects)) + " objects.")
|
|
|
|
if len(objects) == 0:
|
|
self.logger.warning("No objects matched. Doing nothing.")
|
|
return
|
|
|
|
if len(objects) < count:
|
|
count = len(objects)
|
|
self.logger.debug("Actually, limiting to " + str(count) + " processes (one per object).")
|
|
|
|
|
|
per_process = int(len(objects) / count)
|
|
children = []
|
|
|
|
for i in range(0, count):
|
|
if i == count - 1:
|
|
# Last worker, give it any leftovers too
|
|
my_objects = objects[i * per_process:]
|
|
else:
|
|
my_objects = objects[i * per_process:(i * per_process) + per_process]
|
|
|
|
pid = os.fork()
|
|
|
|
if pid == 0:
|
|
# Do child stuff
|
|
callback(my_objects, **args)
|
|
exit(0)
|
|
else:
|
|
children.append(pid)
|
|
self.logger.info("Forked child #" + str(i) + " as pid " + str(pid))
|
|
|
|
for i in range(0, len(children)):
|
|
os.waitpid(children[i], 0)
|
|
|
|
def _list_objects(self, bucket, path, modified_since=None, owner=None):
|
|
objects = []
|
|
|
|
args = {
|
|
'Bucket': bucket,
|
|
'Prefix': path
|
|
}
|
|
|
|
if owner is not None:
|
|
args['FetchOwner'] = True
|
|
|
|
total = 0
|
|
while True:
|
|
list_objects_response = self.s3.list_objects_v2(**args)
|
|
|
|
if 'Contents' not in list_objects_response:
|
|
break
|
|
|
|
for item in list_objects_response['Contents']:
|
|
if modified_since is not None and item['LastModified'] < modified_since:
|
|
continue
|
|
if owner is not None and item['Owner']['DisplayName'] != owner:
|
|
continue
|
|
objects.append(item['Key'])
|
|
|
|
total = total + len(list_objects_response['Contents'])
|
|
self.logger.debug('Got ' + str(total) + ' objects; kept ' + str(len(objects)))
|
|
|
|
if 'NextContinuationToken' in list_objects_response:
|
|
args['ContinuationToken'] = list_objects_response['NextContinuationToken']
|
|
else:
|
|
break
|
|
|
|
return objects
|
|
|
|
|
|
|