Browse Source

basic functionality

master
Adam Pippin 3 years ago
commit
adc0899acc
  1. 5
      composer.json
  2. 64
      db_copy.php
  3. 117
      src/Copy.php
  4. 44
      src/Cursor.php
  5. 114
      src/Driver/Elasticsearch.php
  6. 12
      src/Driver/IDriver.php
  7. 121
      src/Driver/Postgres.php
  8. 9
      src/ICursor.php
  9. 31
      src/Transform/FieldMap.php
  10. 29
      src/Transform/Group.php
  11. 8
      src/Transform/Transform.php
  12. 30
      src/Transform/ValueMap.php

5
composer.json

@ -0,0 +1,5 @@
{
"autoload": {
"psr-4": { "DbCopy\\": "src/" }
}
}

64
db_copy.php

@ -0,0 +1,64 @@
<?php
require('vendor/autoload.php');
if (!isset($argv[1]) || !file_exists($argv[1]))
{
die('Config file not specified or not found'.PHP_EOL);
}
$config = json_decode(file_get_contents($argv[1]), true);
if (!isset($config))
{
die('Invalid config file'.PHP_EOL);
}
$source_driver = $config['source']['driver'];
$source_config = $config['source']['config'];
$class = '\\DbCopy\\Driver\\'.$source_driver;
$source = new $class();
$source->open($source_config);
$target_driver = $config['target']['driver'];
$target_config = $config['target']['config'];
$class = '\\DbCopy\\Driver\\'.$target_driver;
$target = new $class();
$target->open($target_config);
$transforms = [];
if (isset($config['transform']))
{
foreach ($config['transform'] as $transform)
{
$transform_type = $transform['type'];
$transform_config = $transform['config'];
$class = '\\DbCopy\\Transform\\'.$transform_type;
$transforms[] = new $class($transform_config);
}
}
$filter_column = $config['filter']['column'];
$filter_values = $config['filter']['values'];
$copy_options = $config['options'];
$results = [];
foreach ($filter_values as $filter_value)
{
echo "===== COPYING $filter_column=$filter_value =====".PHP_EOL;
$copy = \DbCopy\Copy::create($copy_options);
// FIXME: This query thing is ES-specific
$copy->source($source, ['query'=>[$filter_column=>$filter_value]]);
$copy->destination($target);
if (!empty($transforms))
$copy->transform(\DbCopy\Transform\Group::create($transforms));
$count = $copy->execute();
echo "Copied: $count records".PHP_EOL;
$results[$filter_value] = $count;
}
var_dump($results);

117
src/Copy.php

@ -0,0 +1,117 @@
<?php
namespace DbCopy;
use DbCopy\Driver\IDriver;
class Copy
{
private $_options;
private $_src = null, $_srcParams = [];
private $_dst = null, $_dstParams = [];
private $_transform = null;
private $_status_lastUpdate, $_status_lastCount;
public function __construct(array $params = [])
{
$this->_options = array_merge([
'limit' => null,
'batch' => 2500,
'status_interval' => 5
], $params);
}
public static function create(array $params = []): Copy
{
return new Copy($params);
}
public function source(IDriver $source, array $params = []): Copy
{
$this->_src = $source;
$this->_srcParams = $params;
return $this;
}
public function destination(IDriver $destination, array $params = []): Copy
{
$this->_dst = $destination;
$this->_dstParams = $params;
return $this;
}
public function transform(callable $callback): Copy
{
$this->_transform = $callback;
return $this;
}
public function execute(): int
{
$cursor = $this->_src->get($this->_srcParams);
$transform = $this->_transform;
$count = 0;
for (;;)
{
// Read records
$records = [];
$batchSize = 0;
while ($record = $cursor->get())
{
$records[] = $record;
$batchSize++;
if ($batchSize == $this->_options['batch'] ||
$count + $batchSize == $this->_options['limit'])
break;
}
if ($batchSize == 0)
{
// Done copying!
return $count;
}
// Transform records if applicable
if (is_callable($transform))
{
foreach ($records as &$record)
{
$record = $transform($record);
}
unset($record);
$records = array_filter($records);
}
// Write records
if (method_exists($this->_dst, 'putBatch') && $this->_options['batch'] > 1)
{
$this->_dst->putBatch($records, $this->_dstParams);
}
else
{
// Otherwise write one by one
foreach ($records as $record)
{
$this->_dst->put($record, $this->_dstParams);
}
}
$count += sizeof($records);
if (isset($this->_options['status_interval']) &&
$this->_status_lastUpdate + $this->_options['status_interval'] < time())
{
$processed = $count - $this->_status_lastCount;
$pps = round($processed / (time() - $this->_status_lastUpdate), 0);
echo "Complete: ".$count."; Records/sec: ".$pps.PHP_EOL;
$this->_status_lastUpdate = time();
$this->_status_lastCount = $count;
}
}
return $count;
}
}

44
src/Cursor.php

@ -0,0 +1,44 @@
<?php
namespace DbCopy;
class Cursor implements ICursor
{
private $_closed;
private $_buffer;
private $_callback;
public function __construct(callable $callback)
{
$this->_callback = $callback;
$this->_closed = false;
$this->_buffer = [];
}
public function closed(): bool
{
return $this->_closed;
}
public function get(): ?array
{
if ($this->_closed)
{
return null;
}
if (sizeof($this->_buffer) == 0)
{
// Fill more data
$callback = $this->_callback;
$this->_buffer = $callback();
if (sizeof($this->_buffer) == 0)
{
$this->_closed = true;
return null;
}
}
return array_shift($this->_buffer);
}
}

114
src/Driver/Elasticsearch.php

@ -0,0 +1,114 @@
<?php
namespace DbCopy\Driver;
class Elasticsearch implements IDriver
{
private $_options;
public function open(array $params): void
{
$params = array_merge([
'endpoint' => null,
'index' => null,
'retries' => 10,
'size' => 5000,
'scroll_window' => '5m',
'query' => null,
'timeout' => 500000
], $params);
$params = filter_var_array($params, [
'endpoint' => ['filter'=>FILTER_VALIDATE_URL, 'flags'=>FILTER_REQUIRE_SCALAR],
'index' => ['filter'=>FILTER_SANITIZE_ENCODED, 'flags'=>FILTER_REQUIRE_SCALAR],
'retries' => ['filter'=>FILTER_VALIDATE_INT, 'flags'=>FILTER_REQUIRE_SCALAR],
'size' => ['filter'=>FILTER_VALIDATE_INT, 'flags'=>FILTER_REQUIRE_SCALAR],
'scroll_window' => ['filter'=>FILTER_UNSAFE_RAW, 'flags'=>FILTER_REQUIRE_SCALAR],
'query' => ['filter'=>FILTER_UNSAFE_RAW, 'flags'=>FILTER_REQUIRE_ARRAY]
]);
$this->_options = $params;
}
public function close(): void
{
// noop
}
public function get(array $params = []): \DbCopy\ICursor
{
$options = array_merge($this->_options, $params);
$state = (object)[
'scroll_id' => null,
];
return new \DbCopy\Cursor(function() use ($options, $state) {
$retry_timeout = 500000;
for ($i=0; $i<$options['retries']; $i++)
{
$ch = curl_init();
if (isset($state->scroll_id))
{
$body = json_encode(['scroll'=>$options['scroll_window'], 'scroll_id'=>$state->scroll_id]);
curl_setopt($ch, CURLOPT_URL, $options['endpoint'].'/_search/scroll');
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
curl_setopt($ch, CURLOPT_HTTPHEADER, [
'Content-Type: application/json',
'Content-Length: '.strlen($body)
]);
curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
}
else
{
$query_string = [];
$query_string[] = 'scroll='.$options['scroll_window'];
$query_string[] = 'size='.$options['size'];
if (isset($options['query']) && $options['query'] !== false)
{
$search_params = [];
foreach ($options['query'] as $k=>$v)
{
$search_params[] = $k.':'.$v;
}
$query_string[] = 'q='.implode(' AND ', $search_params);
}
curl_setopt($ch, CURLOPT_URL, $options['endpoint'].'/'.$options['index'].'/_search?'.implode('&', $query_string));
}
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$response = curl_exec($ch);
curl_close($ch);
if ($response === false)
{
usleep($timeout);
$timeout = $timeout * 2;
continue;
}
$response = json_decode($response, true);
if (!isset($response['_scroll_id']))
{
var_dump($response);
throw new \Exception("Elasticsearch response did not contain scroll id");
}
$state->scroll_id = $response['_scroll_id'];
return array_map(function($es_arr) {
return $es_arr['_source'];
}, $response['hits']['hits']);
}
throw new \Exception("Requested failed after ".$options['retries']." retries.");
});
}
public function put(array $record): void
{
throw new \Exception("Not implemented");
}
}

12
src/Driver/IDriver.php

@ -0,0 +1,12 @@
<?php
namespace DbCopy\Driver;
interface IDriver
{
public function open(array $params): void;
public function close(): void;
public function get(array $params = []): \DbCopy\ICursor;
public function put(array $record): void;
}

121
src/Driver/Postgres.php

@ -0,0 +1,121 @@
<?php
namespace DbCopy\Driver;
class Postgres implements IDriver
{
private $_options;
private $_pdo;
private $_insert_statement, $_insert_statement_size, $_insert_statement_fields;
public function open(array $params): void
{
$params = array_merge([
'dsn' => null,
'username' => null,
'password' => null,
'query' => null,
'table' => null
], $params);
$this->_options = $params;
$this->_pdo = new \PDO($params['dsn'], $params['username'], $params['password']);
$this->_pdo->query('SET AUTOCOMMIT');
}
public function close(): void
{
$this->_pdo = null;
}
public function get(array $params = []): \DbCopy\ICursor
{
$options = array_merge($this->_options, $params);
$statement = $this->_pdo->prepare($params['query']);
$statement->execute();
return new \DbCopy\Cursor(function() use ($options, $statement) {
$record = $statement->fetch(\PDO::FETCH_ASSOC);
if (!$record) return [];
return [$record];
});
}
public function put(array $record, array $params = []): void
{
$params = array_merge($this->_options, $params);
$sql = 'insert into "'.$params['table'].'"(';
$fields = array_keys($record);
$sql .= '"'.implode('", "', $fields).'"';
$sql .= ') VALUES ('.implode(', ', str_split(str_repeat('?', sizeof($fields)))).')';
$statement = $this->_pdo->prepare($sql);
$result = $statement->execute(array_values($record));
if ($result === false)
{
var_dump($record);
throw new \Exception("Error inserting record into Postgres: ".print_r($statement->errorInfo(), true));
}
}
public function putBatch(array $records, array $params = []): void
{
$params = array_merge($this->_options, $params);
// Find all possible fields in all records while sorting their keys
// so they're all consistent
$fields = [];
foreach ($records as &$record)
{
$fields = array_merge($fields, array_keys($record));
}
unset($record);
// Make unique
$fields = array_unique($fields);
// Sort so this is kinda stable
sort($fields);
// Generate prepared statement if one doesn't exist or it doesn't match
// the current batch
if (!isset($this->_insert_statement) ||
sizeof($records) != $this->_insert_statement_size ||
$fields !== $this->_insert_statement_fields)
{
echo 'new statement'.PHP_EOL;
$sql = 'insert into "'.$params['table'].'"(';
$sql .= '"'.implode('", "', $fields).'"';
$sql .= ') VALUES ';
$sql .= substr(str_repeat('('.implode(', ', str_split(str_repeat('?', sizeof($fields)))).'), ', sizeof($records)), 0, -2);
$this->_insert_statement = $this->_pdo->prepare($sql);
$this->_insert_statement_size = sizeof($records);
$this->_insert_statement_fields = $fields;
}
// Generate values from records
$values = [];
foreach ($records as $record)
{
foreach ($fields as $field)
{
if (isset($record[$field]))
$values[] = $record[$field];
else
$values[] = null;
}
}
// Execute statement
$result = $this->_insert_statement->execute($values);
if ($result === false)
{
throw new \Exception("Error inserting record into Postgres: ".print_r($this->_insert_statement->errorInfo(), true));
}
}
}

9
src/ICursor.php

@ -0,0 +1,9 @@
<?php
namespace DbCopy;
interface ICursor
{
public function closed(): bool;
public function get(): ?array;
}

31
src/Transform/FieldMap.php

@ -0,0 +1,31 @@
<?php
namespace DbCopy\Transform;
class FieldMap extends Transform
{
private $_fields;
public function __construct(array $fields)
{
$this->_fields = $fields;
}
public static function create(array $fields): FieldMap
{
return new FieldMap($fields);
}
public function __invoke($record): ?array
{
$new_record = [];
foreach ($this->_fields as $src=>$dst)
{
if (isset($record[$src]))
{
$new_record[$dst] = $record[$src];
}
}
return $new_record;
}
}

29
src/Transform/Group.php

@ -0,0 +1,29 @@
<?php
namespace DbCopy\Transform;
class Group extends Transform
{
private $_transforms;
public function __construct(array $transforms)
{
$this->_transforms = $transforms;
}
public static function create(array $transforms): Group
{
return new Group($transforms);
}
public function __invoke($record): ?array
{
foreach ($this->_transforms as $transform)
{
$record = $transform($record);
if (!isset($record))
return null;
}
return $record;
}
}

8
src/Transform/Transform.php

@ -0,0 +1,8 @@
<?php
namespace DbCopy\Transform;
abstract class Transform
{
public abstract function __invoke($record): ?array;
}

30
src/Transform/ValueMap.php

@ -0,0 +1,30 @@
<?php
namespace DbCopy\Transform;
class ValueMap extends Transform
{
private $_map;
public function __construct(array $map)
{
$this->_map = $map;
}
public static function create(array $map): ValueMap
{
return new ValueMap($map);
}
public function __invoke($record): ?array
{
foreach ($this->_map as $field=>$values)
{
if (isset($record[$field]) && isset($values[$record[$field]]))
{
$record[$field] = $values[$record[$field]];
}
}
return $record;
}
}
Loading…
Cancel
Save