Shift data around between different data stores.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
2.4 KiB

<?php
namespace DbCopy;
use DbCopy\Driver\IDriver;
class Copy
{
private $_options;
private $_src = null, $_srcParams = [];
private $_dst = null, $_dstParams = [];
private $_transform = null;
private $_status_lastUpdate, $_status_lastCount;
public function __construct(array $params = [])
{
$this->_options = array_merge([
'limit' => null,
'batch' => 2500,
'status_interval' => 5
], $params);
}
public static function create(array $params = []): Copy
{
return new Copy($params);
}
public function source(IDriver $source, array $params = []): Copy
{
$this->_src = $source;
$this->_srcParams = $params;
return $this;
}
public function destination(IDriver $destination, array $params = []): Copy
{
$this->_dst = $destination;
$this->_dstParams = $params;
return $this;
}
public function transform(callable $callback): Copy
{
$this->_transform = $callback;
return $this;
}
public function execute(): int
{
$cursor = $this->_src->get($this->_srcParams);
$transform = $this->_transform;
$count = 0;
for (;;)
{
// Read records
$records = [];
$batchSize = 0;
while ($record = $cursor->get())
{
$records[] = $record;
$batchSize++;
if ($batchSize == $this->_options['batch'] ||
$count + $batchSize == $this->_options['limit'])
break;
}
if ($batchSize == 0)
{
// Done copying!
return $count;
}
// Transform records if applicable
if (is_callable($transform))
{
foreach ($records as &$record)
{
$record = $transform($record);
}
unset($record);
$records = array_filter($records);
}
// Write records
if (method_exists($this->_dst, 'putBatch') && $this->_options['batch'] > 1)
{
$this->_dst->putBatch($records, $this->_dstParams);
}
else
{
// Otherwise write one by one
foreach ($records as $record)
{
$this->_dst->put($record, $this->_dstParams);
}
}
$count += sizeof($records);
if (isset($this->_options['status_interval']) &&
$this->_status_lastUpdate + $this->_options['status_interval'] < time())
{
$processed = $count - $this->_status_lastCount;
$pps = round($processed / (time() - $this->_status_lastUpdate), 0);
echo "Complete: ".$count."; Records/sec: ".$pps.PHP_EOL;
$this->_status_lastUpdate = time();
$this->_status_lastCount = $count;
}
}
return $count;
}
}