You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
117 lines
2.4 KiB
117 lines
2.4 KiB
<?php
|
|
|
|
namespace DbCopy;
|
|
use DbCopy\Driver\IDriver;
|
|
|
|
class Copy
|
|
{
|
|
private $_options;
|
|
|
|
private $_src = null, $_srcParams = [];
|
|
private $_dst = null, $_dstParams = [];
|
|
private $_transform = null;
|
|
|
|
private $_status_lastUpdate, $_status_lastCount;
|
|
|
|
public function __construct(array $params = [])
|
|
{
|
|
$this->_options = array_merge([
|
|
'limit' => null,
|
|
'batch' => 2500,
|
|
'status_interval' => 5
|
|
], $params);
|
|
}
|
|
|
|
public static function create(array $params = []): Copy
|
|
{
|
|
return new Copy($params);
|
|
}
|
|
|
|
public function source(IDriver $source, array $params = []): Copy
|
|
{
|
|
$this->_src = $source;
|
|
$this->_srcParams = $params;
|
|
return $this;
|
|
}
|
|
|
|
public function destination(IDriver $destination, array $params = []): Copy
|
|
{
|
|
$this->_dst = $destination;
|
|
$this->_dstParams = $params;
|
|
return $this;
|
|
}
|
|
|
|
public function transform(callable $callback): Copy
|
|
{
|
|
$this->_transform = $callback;
|
|
return $this;
|
|
}
|
|
|
|
public function execute(): int
|
|
{
|
|
$cursor = $this->_src->get($this->_srcParams);
|
|
$transform = $this->_transform;
|
|
$count = 0;
|
|
|
|
for (;;)
|
|
{
|
|
// Read records
|
|
$records = [];
|
|
$batchSize = 0;
|
|
while ($record = $cursor->get())
|
|
{
|
|
$records[] = $record;
|
|
$batchSize++;
|
|
if ($batchSize == $this->_options['batch'] ||
|
|
$count + $batchSize == $this->_options['limit'])
|
|
break;
|
|
}
|
|
|
|
if ($batchSize == 0)
|
|
{
|
|
// Done copying!
|
|
return $count;
|
|
}
|
|
|
|
// Transform records if applicable
|
|
if (is_callable($transform))
|
|
{
|
|
foreach ($records as &$record)
|
|
{
|
|
$record = $transform($record);
|
|
}
|
|
unset($record);
|
|
$records = array_filter($records);
|
|
}
|
|
|
|
// Write records
|
|
if (method_exists($this->_dst, 'putBatch') && $this->_options['batch'] > 1)
|
|
{
|
|
$this->_dst->putBatch($records, $this->_dstParams);
|
|
}
|
|
else
|
|
{
|
|
// Otherwise write one by one
|
|
foreach ($records as $record)
|
|
{
|
|
$this->_dst->put($record, $this->_dstParams);
|
|
}
|
|
}
|
|
|
|
$count += sizeof($records);
|
|
|
|
if (isset($this->_options['status_interval']) &&
|
|
$this->_status_lastUpdate + $this->_options['status_interval'] < time())
|
|
{
|
|
$processed = $count - $this->_status_lastCount;
|
|
$pps = round($processed / (time() - $this->_status_lastUpdate), 0);
|
|
echo "Complete: ".$count."; Records/sec: ".$pps.PHP_EOL;
|
|
$this->_status_lastUpdate = time();
|
|
$this->_status_lastCount = $count;
|
|
}
|
|
}
|
|
|
|
return $count;
|
|
}
|
|
|
|
}
|
|
|