Skip to main content

low-level-migration

The low level migration framework is implemented in depot-spark.

The main interface is MultiStepTask which is intended to be run on EMR serverless, when an environment is configured with an executor of type emrserverless then an EMR serverless application is configured automatically, along with a lambda that can be used to invoke the application. However, if an environment does not configure a serverless executor, it is still possible (and relatively straightforward) to set one up by hand, since all we do automatically is create an application with default parameters.

Interface

The lambda interface is of the form:

{
"id": "<if provided, all other properties are ignored and the result of the job with this id is returned>",
"backendVersion": "<depot platform version, defaults to the currently deployed backend version>",
"environmentUri": "<full env uri, defaults to the currently deployed env uri>",
"steps": [
...etc...
]
}

The backendVersion is notably configurable, that is there is no need to deploy a new version to an env to use it.

However, the most common usage would be to let most properties default to those provided by the deployed env e.g.:

{
"steps": [
...etc...
]
}

The response is of the form:

{
"id": "<id of the invoked job>,
"status": "RUNNING" | "FAILED"
}

This is converted to an EMR command of the form:

spark-submit --class tech.stage.depot.platform.spark.task.MultiStepTask --environment-uri <env uri> --environment-id <env id> --steps <jon encoded step array>

Internal format

Most steps operate by taking data from some external format (e.g. DynamoDB export format), converting the data into the internal format, and/or converting data back to external formats or writing directly to storage.

The internal format is Parquet partitioned in S3 as follows:

<root>/datasetId=<dsid>/store=<storename>/schema=<schemaname>/schemaVersion=1

The schema version field should be ignored, it is NOT the schema version as set by the developer of the schema, and should always be 1, if that ever changes, then this document should be updated to reflect that change.

Each record in the S3 location has the above partition values, and a single additional column data, this is the basestar serialized form of the input record (binary format).

This layout allows us to operate on all records of all datasets/stores/schema at once, avoids the need to work with Spark schemas, and lets us work with 'almost correct' records, which is important because these tools are designed to facilitate repair, it does mean that data needs to be marshalled/unmarshalled out of the data column, but this is an easily parallelizable operation.

Common step configuration parameters

There are some common parameters/custom types in the configuration interface for steps, these are:

inputUri/outputUri: (object-uris)

This is either a string URI for the top level input/output path, or a map of uris for each type of input/output, passing e.g. "s3://my-bucket/my-prefix" is shorthand for

{
"object": "s3://my-bucket/my-prefix/object",
"history": "s3://my-bucket/my-prefix/history",
"index": "s3://my-bucket/my-prefix/index"
}

In most circumstances, passing a string uri is preferred, but the additional flexibility allows for excluding/combining parts of the input/output to any task which supports <object-uris> configuration parameters.

datasets/stores: (id-filter)

This is used to filter either datasets or stores, in tasks that support it.

Three configuration properties are supported, a set of ids to include, a set of ids to exclude, and remapping from included input ids to output ids

{
"include": ["id1", "id2", ...],
"exclude": ["id3", "id4", ...],
"remapping: [
{
"source": "id5
"target": "id6"
}
]
}
  • include if specified, then only the provided input ids will be included, if not specified then all will be included (unless excluded)
  • exclude if specified, then those ids provided will not be included, this overrides include (i.e. include: a, exclude: a will exclude a)
  • remapping for each entry, the source id is turned into the target id on output

The following special ids are also supported:

  • __default the default id, only applicable for stores, if this is used as an include then only the default store will be processed, if this is used as a remapping target then remapped records will appear to be from the default store. the intended usage of this is to take records from the active store, emit them as if they are on the default store, transform/otherwise process them, and then export them to some target store, without needing to know the id of the initial store during the import/transform step
  • __active active ids only, for stores this is the active store for the currently processed dataset (as determined by the environment's State table), for datasets it is all datasets that are referenced in the environment uri, this can be used to avoid processing stale/orphaned data (not applicable as a remapped target id).
  • __all this is mostly useful in remapping, since it can be used to e.g. combine records from all included stores and emit them as if they were in some other store (not applicable as a remapped target id).

Steps

The following steps are supported, in general steps that bring data to the internal format are named as Import steps, whilst steps that put data from the internal format or into other storage are Export steps, which may seem counter-intuitive, e.g. one would export data from DynamoDB, import it to the internal format, and then export it to S3 in a different external format or to another storage.

ImportFromDynamoDB

Takes data in the DynamoDB export format, and converts it to the internal format and outputs it to S3 in the internal format.

{
"step": "ImportFromDynamoDB",
"inputUri": <object-uris>,
"outputUri" <object-uris>,
"overwrite": true | false,
"datasets": <id-filter>,
"stores": <id-filter>,
"strict": true | false // if false then input props will be coerced if possible
}

ImportFromStorage

Takes data from the storage configured for the environment and outputs it to S3 in the internal format.

The current version of each object, it's history, and the records for any index are imported.

At current, only DynamoDB is supported, and this step in fact just triggers a DynamoDB export job, waits for it to complete and then runs an ImportFromDynamoDB step.

{
"step": "ImportFromStorage",
"pointInTime": <timestamp in ISO8601 format>
"outputUri" <object-uris>,
"overwrite": true | false,
"datasets": <id-filter>,
"stores": <id-filter>,
"strict": true | false // if false then input props will be coerced if possible
}

ImportFromJson

Takes data from S3 in JSON lines format and outputs in the internal format.

The input JSON format is the format that objects look like over e.g. the REST API (single object per line). Records in the input location do not need to be of the same schema, they only need a schema property per record, which could be provided as an input partition e.g. schema=X/records.json, or could be provided as a property on each record. However, this step can only work with a single dataset/store, that is all records in the output will be generated having the provided dataset/store id.

This also supports history, although it's likely more practical to use ImportFromStorage or ImportFromDynamoDB to capture this if it already exists, the only conceivable use of this (aside from tests, which is why it exists) could be e.g. exporting object + history data from Snowflake/SQL using COPY INTO <location> and using that as input to hydrate DynamoDB.

This also supports indexes, although the input format is somewhat complicated, and as with history, it is almost certainly more practical to capture this from storage, it is also not required for the Snowflake use case suggested above, nor would ever be required for a SQL engine (even one that did support indexes).

{
"step": "ImportFromJson",
"inputUri": <object-uris>,
"outputUri" <object-uris>,
"overwrite": true | false,
"datasetId": "store id used in the output",
"storeId": "store id used in the output",
"strict": true | false // if false then input props will be coerced if possible
}

BuildIndexes

Takes data in the internal format, and creates index records according to the indexes configured.

This step can be used to backfill DynamoDB indexes, it is not required when exporting/importing from DynamoDB if indexes have not changed, since all index data is already processed in DynamoDB import/export.

{
"step": "BuildIndexes",
"inputUri" <object-uris>,
"outputUri" <object-uris>,
"overwrite": true | false,
"datasets": <id-filter>,
"stores": <id-filter>
}

ExportToStorage

Takes data in the internal format, and writes it to configured storage.

This supports all extant target storage, except Cognito user/groups, it loads data bypassing any replication mode, that is for a composite storage with ASYNC replication, it is as if it has IMMEDIATE replication, and when the step finishes, the primary and the replica will both contain identical data.

The advantage of this over stream + rollup for migration is that all history is preserved exactly as it appears in the original location.

Use this task carefully! It assumes that for all input dataset + store combinations, there is no data currently present in that dataset's location(s), violation of this assumption is undefined (bad).

{
"step": "ExportToStorage",
"inputUri" <object-uris>,
"datasets": <id-filter>,
"stores": <id-filter>
}

ExportToJson

Takes data in the internal format, and writes it to S3 in the REST json format (object per line). This step is mostly for debugging convenience (the internal format is not human-readable).

{
"step": "ExportToJson",
"inputUri" <object-uris>,
"outputUri" <object-uris>,
"overwrite": true | false
}

ExportToStream

Takes data in the internal format, and writes it to S3 in the stream Parquet format. This step can be used to rebuild all stream data as if it was created at a single point in time, in particular to repair some issue with the existing stream data, however it can also be used to rebuild Snowflake data without risking impact to DynamoDB data (i.e. to generate input to the existing rollup process).

{
"step": "ExportToStream",
"inputUri" <object-uris>,
"outputUri" <string> // note this only supports an S3 directory, different from other outputUris,
"overwrite": true | false,
"datasets": <id-filter>,
"stores": <id-filter>,
"timestamp": <ISO8601 timestamp>
}

Other steps

Other steps not documented here should be considered one-offs/very specialised and should eventually be deprecated/deleted.