Shift data around between different data stores.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
2.4 KiB

namespace DbCopy;
use DbCopy\Driver\IDriver;
class Copy
private $_options;
private $_src = null, $_srcParams = [];
private $_dst = null, $_dstParams = [];
private $_transform = null;
private $_status_lastUpdate, $_status_lastCount;
public function __construct(array $params = [])
$this->_options = array_merge([
'limit' => null,
'batch' => 2500,
'status_interval' => 5
], $params);
public static function create(array $params = []): Copy
return new Copy($params);
public function source(IDriver $source, array $params = []): Copy
$this->_src = $source;
$this->_srcParams = $params;
return $this;
public function destination(IDriver $destination, array $params = []): Copy
$this->_dst = $destination;
$this->_dstParams = $params;
return $this;
public function transform(callable $callback): Copy
$this->_transform = $callback;
return $this;
public function execute(): int
$cursor = $this->_src->get($this->_srcParams);
$transform = $this->_transform;
$count = 0;
for (;;)
// Read records
$records = [];
$batchSize = 0;
while ($record = $cursor->get())
$records[] = $record;
if ($batchSize == $this->_options['batch'] ||
$count + $batchSize == $this->_options['limit'])
if ($batchSize == 0)
// Done copying!
return $count;
// Transform records if applicable
if (is_callable($transform))
foreach ($records as &$record)
$record = $transform($record);
$records = array_filter($records);
// Write records
if (method_exists($this->_dst, 'putBatch') && $this->_options['batch'] > 1)
$this->_dst->putBatch($records, $this->_dstParams);
// Otherwise write one by one
foreach ($records as $record)
$this->_dst->put($record, $this->_dstParams);
$count += sizeof($records);
if (isset($this->_options['status_interval']) &&
$this->_status_lastUpdate + $this->_options['status_interval'] < time())
$processed = $count - $this->_status_lastCount;
$pps = round($processed / (time() - $this->_status_lastUpdate), 0);
echo "Complete: ".$count."; Records/sec: ".$pps.PHP_EOL;
$this->_status_lastUpdate = time();
$this->_status_lastCount = $count;
return $count;