Shift data around between different data stores.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

7.0 KiB

db_copy

Simple tool for shuffling data around.

Why?

It's a stream-based, enterprise-ready, data warehousing ETL as a service.

Oh wait, no it's not. It's 500 lines of code with effectively zero dependencies.

When you just need to shift a bit of data around to do some basic proof of concept work or load testing it does a thing with minimal fuss.

What?

Run db_copy.php and pass it in a JSON file that defines your connections, data to move, and any transforms to apply along the way.

E.g.,:

$ cat > job_definition.json
{
	"source": {
		"driver": "Elasticsearch",
		"config": {
			"endpoint": "https://my-cluster.internal",
			"size": 10000,
			"index": "my_index"
		}
	},
	"target": {
		"driver": "Postgres",
		"config": {
			"dsn": "pgsql:host=my-postgres.internal;port=5432;dbname=mydb",
			"username": "mydb",
			"password": "mydbpw",
			"table": "my_new_table"
		}
	},
	"filter": {
		"column": "date",
		"values": [
			"20210101",
			"20210102",
			"20210103",
			"20210104",
			"20210105"
		]
	},
	"transform": [
		{
			"type": "FieldMap",
			"config": {
				"source_column": "target_column",
				"internal_id": "id",
				"aFieldName": "a_field_name"
				"stringColumn", "int_column"
			}
		},
		{
			"type": "ValueMap",
			"config": {
				"int_column": {
					"Unknown": -1
				}
			}
		}
	]
}
^D

$ php db_copy.php job_definition.json
===== COPYING date=20210101 =====
new statement
Complete: 2500; Records/sec: 0
Complete: 32500; Records/sec: 5000
Complete: 62500; Records/sec: 5000
Copied: 62500 records
[...]
===== COPYING date=20210105 =====
new statement
Complete: 2500; Records/sec: 0
Complete: 32500; Records/sec: 5000
Copied: 32500 records

array(5) {
  [20200101]=>
  int(62500)
  [20200102]=>
  int(12345)
  [20200103]=>
  int(67890)
  [20200104]=>
  int(55234)
  [20200105]=>
  int(32500)
}

$

Job Definition Syntax

{
	"options": {
		// Program-level options
	},
	"source": {
		// Source data store
	},
	"target": {
		// Target data store
	},
	"filter": {
		// If we want a subset of the data
	},
	"transform": {
		// Any transformations to apply to the data between extract and load
	}
}

Options

{
	"options": {
		"batch": 2500,
		"limit": 10000,
		"status_interval": 5
	}
}

batch (2500) controls how many records to load into memory, transform, and insert at a time. You probably want to take a few things into account when setting this:

  • This is probably best set as some factor/multiple of your source data's batch size, if applicable.
  • Postgres supports a maximum of 65,535 bound parameters in each query. With Postgres cannot exceed 65535 / $number_of_fields_in_each_inserted_record.
  • This is how many records to buffer into memory. Increasing this increases RAM requirements.

limit (null) if set, this stops the copy after this many records have been consumed. I will not guarantee there isn't an off-by-one error here.

status_interval (5) sets the minimum time in seconds between outputting status updates on how many records have been copied. If you set this to 0 it will output after every single batch. If you set it to null the status updates will be disabled.

Source/Target

{
	"source": {
		"driver": "", // Elasticsearch and Postgres, maps directly to classes in src/Driver/
		"config": {
			// Passed directly through to driver
		}
	}
}

Postgres supports:

  • dsn: PDO DSN for connection
  • username, password: Database credentials
  • table: Table to load from/to

Elasticsearch supports:

  • endpoint: Http endpoint for ES node
  • index: Index to load data from
  • retries (10): Number of times to retry a request in case of intermittent connection issues.
  • timeout (500000): Initial back-off for retries, in microseconds (default 500ms). Doubled on each subsequent retry of a single request.
  • scroll_window (5m): How long Elasticsearch should keep the cursor open. Consider increasing this if your inserts are taking longer than this window.
  • size (5000): How many documents to request in each request from Elasticsearch

Filter

{
	"column": "",     // name of column to filter on
	"values": [
		// List of values to include
	]
}

Transform

{
	"transform": [
		{
			"type": "", // Name of the transform, directly maps to a class in src/Transform,
			"config": [
				// Passed directly through to transform class
			]
		}
	]
}

Transforms are performed in the order they're listed. Subsequent transforms will see the output of the previous transforms.

There are currently two transforms:

FieldMap

Config is a dictionary mapping a source column name to a target column name. Any column not listed will be removed.

ValueMap

Config is a dictionary mapping a source value to a target value. Any value not mapped will be passed through as-is.

Limitations

  • It's a straightforward single thread that just loads a chunk of data from a source then puts it in a destination. If you're trying to move a couple hundred million records you probably wanna go get a coffee.
  • There's no support for resuming or anything. If you're trying to get rid of a document store because it's full of janky data, it might get annoying.
  • Only implemented drivers are Elasticsearch and Postgres. It only supports ES -> Postgres right now because:
    • I didn't implement writing in the Elasticsearch driver.
    • The main db_copy script passes the filter parameters in in a format specific to Elasticsearch.

License

Copyright (c) 2021 Adam Pippin. All rights reserved.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

    1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
    2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
    3. All advertising materials mentioning features or use of this software must display the following acknowledgement:
    This product includes software developed by Adam Pippin.
    4. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY COPYRIGHT HOLDER "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL COPYRIGHT HOLDER BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.