commit adc0899acc5d74117a618200bb22243a8490225b Author: Adam Pippin Date: Thu Feb 4 12:01:57 2021 -0800 basic functionality diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..184c76c --- /dev/null +++ b/composer.json @@ -0,0 +1,5 @@ +{ + "autoload": { + "psr-4": { "DbCopy\\": "src/" } + } +} diff --git a/db_copy.php b/db_copy.php new file mode 100644 index 0000000..7cadf88 --- /dev/null +++ b/db_copy.php @@ -0,0 +1,64 @@ +open($source_config); + +$target_driver = $config['target']['driver']; +$target_config = $config['target']['config']; +$class = '\\DbCopy\\Driver\\'.$target_driver; +$target = new $class(); +$target->open($target_config); + +$transforms = []; +if (isset($config['transform'])) +{ + foreach ($config['transform'] as $transform) + { + $transform_type = $transform['type']; + $transform_config = $transform['config']; + $class = '\\DbCopy\\Transform\\'.$transform_type; + $transforms[] = new $class($transform_config); + } +} + +$filter_column = $config['filter']['column']; +$filter_values = $config['filter']['values']; + +$copy_options = $config['options']; + +$results = []; + +foreach ($filter_values as $filter_value) +{ + echo "===== COPYING $filter_column=$filter_value =====".PHP_EOL; + + $copy = \DbCopy\Copy::create($copy_options); + // FIXME: This query thing is ES-specific + $copy->source($source, ['query'=>[$filter_column=>$filter_value]]); + $copy->destination($target); + if (!empty($transforms)) + $copy->transform(\DbCopy\Transform\Group::create($transforms)); + $count = $copy->execute(); + echo "Copied: $count records".PHP_EOL; + $results[$filter_value] = $count; +} + +var_dump($results); + diff --git a/src/Copy.php b/src/Copy.php new file mode 100644 index 0000000..311ccaa --- /dev/null +++ b/src/Copy.php @@ -0,0 +1,117 @@ +_options = array_merge([ + 'limit' => null, + 'batch' => 2500, + 'status_interval' => 5 + ], $params); + } + + public static function create(array $params = []): Copy + { + return new Copy($params); + } + + public function source(IDriver $source, array $params = []): Copy + { + $this->_src = $source; + $this->_srcParams = $params; + return $this; + } + + public function destination(IDriver $destination, array $params = []): Copy + { + $this->_dst = $destination; + $this->_dstParams = $params; + return $this; + } + + public function transform(callable $callback): Copy + { + $this->_transform = $callback; + return $this; + } + + public function execute(): int + { + $cursor = $this->_src->get($this->_srcParams); + $transform = $this->_transform; + $count = 0; + + for (;;) + { + // Read records + $records = []; + $batchSize = 0; + while ($record = $cursor->get()) + { + $records[] = $record; + $batchSize++; + if ($batchSize == $this->_options['batch'] || + $count + $batchSize == $this->_options['limit']) + break; + } + + if ($batchSize == 0) + { + // Done copying! + return $count; + } + + // Transform records if applicable + if (is_callable($transform)) + { + foreach ($records as &$record) + { + $record = $transform($record); + } + unset($record); + $records = array_filter($records); + } + + // Write records + if (method_exists($this->_dst, 'putBatch') && $this->_options['batch'] > 1) + { + $this->_dst->putBatch($records, $this->_dstParams); + } + else + { + // Otherwise write one by one + foreach ($records as $record) + { + $this->_dst->put($record, $this->_dstParams); + } + } + + $count += sizeof($records); + + if (isset($this->_options['status_interval']) && + $this->_status_lastUpdate + $this->_options['status_interval'] < time()) + { + $processed = $count - $this->_status_lastCount; + $pps = round($processed / (time() - $this->_status_lastUpdate), 0); + echo "Complete: ".$count."; Records/sec: ".$pps.PHP_EOL; + $this->_status_lastUpdate = time(); + $this->_status_lastCount = $count; + } + } + + return $count; + } + +} diff --git a/src/Cursor.php b/src/Cursor.php new file mode 100644 index 0000000..39cafd8 --- /dev/null +++ b/src/Cursor.php @@ -0,0 +1,44 @@ +_callback = $callback; + $this->_closed = false; + $this->_buffer = []; + } + + public function closed(): bool + { + return $this->_closed; + } + + public function get(): ?array + { + if ($this->_closed) + { + return null; + } + + if (sizeof($this->_buffer) == 0) + { + // Fill more data + $callback = $this->_callback; + $this->_buffer = $callback(); + if (sizeof($this->_buffer) == 0) + { + $this->_closed = true; + return null; + } + } + + return array_shift($this->_buffer); + } +} diff --git a/src/Driver/Elasticsearch.php b/src/Driver/Elasticsearch.php new file mode 100644 index 0000000..32efcfb --- /dev/null +++ b/src/Driver/Elasticsearch.php @@ -0,0 +1,114 @@ + null, + 'index' => null, + 'retries' => 10, + 'size' => 5000, + 'scroll_window' => '5m', + 'query' => null, + 'timeout' => 500000 + ], $params); + + $params = filter_var_array($params, [ + 'endpoint' => ['filter'=>FILTER_VALIDATE_URL, 'flags'=>FILTER_REQUIRE_SCALAR], + 'index' => ['filter'=>FILTER_SANITIZE_ENCODED, 'flags'=>FILTER_REQUIRE_SCALAR], + 'retries' => ['filter'=>FILTER_VALIDATE_INT, 'flags'=>FILTER_REQUIRE_SCALAR], + 'size' => ['filter'=>FILTER_VALIDATE_INT, 'flags'=>FILTER_REQUIRE_SCALAR], + 'scroll_window' => ['filter'=>FILTER_UNSAFE_RAW, 'flags'=>FILTER_REQUIRE_SCALAR], + 'query' => ['filter'=>FILTER_UNSAFE_RAW, 'flags'=>FILTER_REQUIRE_ARRAY] + ]); + + $this->_options = $params; + } + + public function close(): void + { + // noop + } + + public function get(array $params = []): \DbCopy\ICursor + { + $options = array_merge($this->_options, $params); + $state = (object)[ + 'scroll_id' => null, + ]; + + return new \DbCopy\Cursor(function() use ($options, $state) { + $retry_timeout = 500000; + for ($i=0; $i<$options['retries']; $i++) + { + $ch = curl_init(); + + if (isset($state->scroll_id)) + { + $body = json_encode(['scroll'=>$options['scroll_window'], 'scroll_id'=>$state->scroll_id]); + curl_setopt($ch, CURLOPT_URL, $options['endpoint'].'/_search/scroll'); + curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST'); + curl_setopt($ch, CURLOPT_HTTPHEADER, [ + 'Content-Type: application/json', + 'Content-Length: '.strlen($body) + ]); + curl_setopt($ch, CURLOPT_POSTFIELDS, $body); + } + else + { + $query_string = []; + $query_string[] = 'scroll='.$options['scroll_window']; + $query_string[] = 'size='.$options['size']; + if (isset($options['query']) && $options['query'] !== false) + { + $search_params = []; + foreach ($options['query'] as $k=>$v) + { + $search_params[] = $k.':'.$v; + } + $query_string[] = 'q='.implode(' AND ', $search_params); + } + + curl_setopt($ch, CURLOPT_URL, $options['endpoint'].'/'.$options['index'].'/_search?'.implode('&', $query_string)); + } + + curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); + $response = curl_exec($ch); + curl_close($ch); + + if ($response === false) + { + usleep($timeout); + $timeout = $timeout * 2; + continue; + } + + $response = json_decode($response, true); + + if (!isset($response['_scroll_id'])) + { + var_dump($response); + throw new \Exception("Elasticsearch response did not contain scroll id"); + } + + $state->scroll_id = $response['_scroll_id']; + + return array_map(function($es_arr) { + return $es_arr['_source']; + }, $response['hits']['hits']); + } + throw new \Exception("Requested failed after ".$options['retries']." retries."); + }); + } + + public function put(array $record): void + { + throw new \Exception("Not implemented"); + } + +} diff --git a/src/Driver/IDriver.php b/src/Driver/IDriver.php new file mode 100644 index 0000000..6cc32e8 --- /dev/null +++ b/src/Driver/IDriver.php @@ -0,0 +1,12 @@ + null, + 'username' => null, + 'password' => null, + 'query' => null, + 'table' => null + ], $params); + + $this->_options = $params; + + $this->_pdo = new \PDO($params['dsn'], $params['username'], $params['password']); + $this->_pdo->query('SET AUTOCOMMIT'); + } + + public function close(): void + { + $this->_pdo = null; + } + + public function get(array $params = []): \DbCopy\ICursor + { + $options = array_merge($this->_options, $params); + + $statement = $this->_pdo->prepare($params['query']); + $statement->execute(); + + return new \DbCopy\Cursor(function() use ($options, $statement) { + $record = $statement->fetch(\PDO::FETCH_ASSOC); + if (!$record) return []; + return [$record]; + }); + + + } + + public function put(array $record, array $params = []): void + { + $params = array_merge($this->_options, $params); + + $sql = 'insert into "'.$params['table'].'"('; + $fields = array_keys($record); + $sql .= '"'.implode('", "', $fields).'"'; + $sql .= ') VALUES ('.implode(', ', str_split(str_repeat('?', sizeof($fields)))).')'; + $statement = $this->_pdo->prepare($sql); + $result = $statement->execute(array_values($record)); + if ($result === false) + { + var_dump($record); + throw new \Exception("Error inserting record into Postgres: ".print_r($statement->errorInfo(), true)); + } + } + + public function putBatch(array $records, array $params = []): void + { + $params = array_merge($this->_options, $params); + + // Find all possible fields in all records while sorting their keys + // so they're all consistent + $fields = []; + foreach ($records as &$record) + { + $fields = array_merge($fields, array_keys($record)); + } + unset($record); + // Make unique + $fields = array_unique($fields); + // Sort so this is kinda stable + sort($fields); + + // Generate prepared statement if one doesn't exist or it doesn't match + // the current batch + if (!isset($this->_insert_statement) || + sizeof($records) != $this->_insert_statement_size || + $fields !== $this->_insert_statement_fields) + { + echo 'new statement'.PHP_EOL; + $sql = 'insert into "'.$params['table'].'"('; + $sql .= '"'.implode('", "', $fields).'"'; + $sql .= ') VALUES '; + $sql .= substr(str_repeat('('.implode(', ', str_split(str_repeat('?', sizeof($fields)))).'), ', sizeof($records)), 0, -2); + $this->_insert_statement = $this->_pdo->prepare($sql); + $this->_insert_statement_size = sizeof($records); + $this->_insert_statement_fields = $fields; + } + + // Generate values from records + $values = []; + foreach ($records as $record) + { + foreach ($fields as $field) + { + if (isset($record[$field])) + $values[] = $record[$field]; + else + $values[] = null; + } + } + + // Execute statement + $result = $this->_insert_statement->execute($values); + + if ($result === false) + { + throw new \Exception("Error inserting record into Postgres: ".print_r($this->_insert_statement->errorInfo(), true)); + } + + } + +} diff --git a/src/ICursor.php b/src/ICursor.php new file mode 100644 index 0000000..502d06d --- /dev/null +++ b/src/ICursor.php @@ -0,0 +1,9 @@ +_fields = $fields; + } + + public static function create(array $fields): FieldMap + { + return new FieldMap($fields); + } + + public function __invoke($record): ?array + { + $new_record = []; + foreach ($this->_fields as $src=>$dst) + { + if (isset($record[$src])) + { + $new_record[$dst] = $record[$src]; + } + } + return $new_record; + } +} diff --git a/src/Transform/Group.php b/src/Transform/Group.php new file mode 100644 index 0000000..005348e --- /dev/null +++ b/src/Transform/Group.php @@ -0,0 +1,29 @@ +_transforms = $transforms; + } + + public static function create(array $transforms): Group + { + return new Group($transforms); + } + + public function __invoke($record): ?array + { + foreach ($this->_transforms as $transform) + { + $record = $transform($record); + if (!isset($record)) + return null; + } + return $record; + } +} diff --git a/src/Transform/Transform.php b/src/Transform/Transform.php new file mode 100644 index 0000000..c727e98 --- /dev/null +++ b/src/Transform/Transform.php @@ -0,0 +1,8 @@ +_map = $map; + } + + public static function create(array $map): ValueMap + { + return new ValueMap($map); + } + + public function __invoke($record): ?array + { + foreach ($this->_map as $field=>$values) + { + if (isset($record[$field]) && isset($values[$record[$field]])) + { + $record[$field] = $values[$record[$field]]; + } + } + return $record; + } +}