Skip to main content

Transactions

A transaction is a bulk data update, consisting of a series of operations performed against a given dataset. A transaction is isolated to a dataset, and either all actions succeed, or all actions fail and no changes are made.

To run a transaction, issue a request to the dataset's transaction endpoint, with the following syntax:

{
"actions": [
"list of action objects"
]
}

Where each action object may be one of the following types:

Upsert action (from S3 location)

This action type upserts (i.e. creates or updates) all object schemas in the dataset with data made available in S3. JSON and Parquet formats are supported.

Syntax

{
"operation": "UPSERT",
"locationUri": "string",
"format": "string"
}
  • locationUri - S3 path to a folder containing data laid out by schema fully-qualified name.
  • format - input format, see Formats section
info

In a snowflake transaction, a single snowflake Stage must exist and be accessible, providing access to the specified location, see https://docs.snowflake.com/en/sql-reference/ddl-stage.html#stage-management.

Ideally, the locationUri should be exact, without any trailing slash characters on the end for S3 locations.

Example

Assuming the following schema:

petstore.Dog:
properties:
name:
type: string

And the following pre-existing data:

idversionname
dog11Scooby Doo

And the following data in the S3 location s3://petstore/ingest1/petstore/Dog/anyfile.json

{
"id": "dog1",
"name": "Scooby"
}
{
"id": "dog2",
"name": "Lassie"
}
{
"id": "dog3",
"name": "Clifford"
}

Then executing the following transaction:

{
"actions": [
{
"operation": "UPSERT",
"locationUri": "s3://petstore/ingest1",
"format": "JSON"
}
]
}

Will result in the following:

idversionname
dog12Scooby
dog21Lassie
dog31Clifford

Upsert action (from table to table)

This action type upserts data directly from one table to another within the same database or account, without requiring intermediate S3 storage. This feature is available for Snowflake and Aurora/PostgreSQL locations.

Syntax

{
"operation": "UPSERT",
"locationUri": "string",
"target": "string"
}
  • locationUri - URI pointing to the source table:
    • Snowflake: snowflake://database.schema.table or snowflake://schema.table (uses current database)
    • Aurora/PostgreSQL: aurora://schema.table or postgres://schema.table
  • target - Fully-qualified name of the target schema (optional, defaults to all schemas in dataset)
Requirements
  • Source and target tables must be in the same database/account
  • The target table structure is defined by the LinkableSchema, not inferred from the source
  • Column mapping is automatic based on schema property names
  • Works with structured properties, arrays, and nested objects (same as S3 import)

Example

Import data from one table to another:

{
"actions": [
{
"operation": "UPSERT",
"locationUri": "snowflake://MY_DB.MY_SCHEMA.SOURCE_TABLE",
"target": "target_schema_name"
}
]
}

Upsert action (from external table)

This action type upserts data from an external table defined in your dataset schema. External tables reference data stored outside the database (e.g., in S3, Azure Blob Storage) but can be queried like regular tables.

Syntax

{
"operation": "UPSERT",
"source": "name",
"target": "name"
}
  • source - Fully-qualified name of the external table schema
  • target - Fully-qualified name of the target object schema (optional, defaults to all schemas in dataset)
Requirements
  • The source must be an external table schema defined in your dataset
  • Column mapping follows the same rules as other UPSERT operations
  • Works with both Snowflake and Aurora/PostgreSQL external table definitions

Example

Import data from an external table:

{
"actions": [
{
"operation": "UPSERT",
"source": "myschema.ExternalSource",
"target": "myschema.TargetObject"
}
]
}

Upsert action (from other schema)

This action is as the upsert from location action, except that only a single object schema is targeted, and data is copied from some other schema (either an object or a view), using a filter pre-defined on that source schema

Syntax

{
"operation": "UPSERT",
"target": "name",
"source": "name",
"query": "name",
"arguments": "map of string, any"
}
  • target - Fully-qualified name of the object schema to copy to
  • source - Fully-qualified name of the object or view schema to copy from
  • query - The (optional) name of a query defined on the source schema to act as a filter
  • arguments - Key-value arguments to the query, if specified (optional)
  • sourceDatasetId - If specified the dataset id that owns the source schema (optional)

Example

With the following schema:

petstore.Dog:
properties:
name:
type: string

petstore.DogChange:
properties:
status:
type: string
name:
type: string
queries:
withStatus:
arguments:
- name: inputStatus
type: string
expression: status == inputStatus

And the following existing data:

petstore.Dog

idversionname
dog11Scooby Doo

petstore.DogChange

idstatusname
dog1readyScooby
dog2readyLassie
dog3pendingClifford

Then executing the following transaction:

{
"actions": [
{
"operation": "UPSERT",
"target": "petstore.Dog",
"source": "petstore.DogChange",
"query": "withStatus",
"arguments": {
"inputStatus": "ready"
}
}
]
}

Will result in the following:

idversionname
dog12Scooby
dog21Lassie

When the source schema exists in the same dataset as the target schema or in a dependency dataset of the target schema dataset, there is no need to specify sourceDatasetId, otherwise, sourceDatasetId can be the id or alias of dataset in the environment.

Patch action

The PATCH action works similarly to the UPSERT action, except that it only updates existing records, and does not create new records. Furthermore, it only updates the columns specified in the input data, leaving all other columns unchanged. The id field must be present in the source data, and must match an existing record in the target data.

Performance Improvement

Since Depot 9.9.0, PATCH operations use a changeset-based transaction approach internally, resulting in improved performance for batch updates.

You can use most schema types as a source: object, view, query and also can use file input, provided the source contains the id field and any columns you wish to alter.

caution

For now it is not possible to specify a list of columns to take into account from the source schema in order to disregard other columns. Any column mentioned in the source (except version, created, updated, schema and hash for object sources) will be used to update the target.

tip

you can achieve the "projection" effect by creating a query schema that selects from your actual source only the columns you intend to patch, and use that schema as a source.

Example

With the following schema:

petstore.Dog:
type: object
properties:
name:
type: string
age:
type: integer

petstore.NextDogAge:
type: query
properties:
id:
type: string!
age:
type: integer
sql: SELECT id, age + 1 as age FROM DOG WHERE age IS NOT NULL
using:
DOG: petstore.Dog

And the following existing data: petstore.Dog

idversionnameage
dog11Scooby Doo5
dog21Médor3
dog31Corniaudnull
Then executing the following transaction:
{
"actions": [
{
"operation": "PATCH",
"target": "petstore.Dog",
"source": "petstore.NextDogAge"
}
]
}

will result in the following data:

idversionnameage
dog12Scooby Doo6
dog22Médor4
dog31Corniaudnull

Delete action

This action type deletes data from all object schemas where there is a matching id in the input location

Syntax

{
"operation": "DELETE",
"locationUri": "string",
"format": "string"
}
  • locationUri - S3 path to a folder containing data laid out by schema fully-qualified name
  • format - input format, see Formats section

Example

Given the following schema:

petstore.Dog:
properties:
name:
type: string

And the following pre-existing data:

idversionname
dog12Scooby
dog21Lassie
dog31Clifford

And the following data in the S3 location s3://petstore/ingest2/petstore/Dog/anyfile.json

{
"id": "dog3"
}

Then executing the following transaction:

{
"actions": [
{
"operation": "DELETE",
"locationUri": "s3://petstore/ingest1",
"format": "JSON"
}
]
}

Will result in the following:

idversionname
dog12Scooby
dog21Lassie

You can also issue a DELETE transaction that deletes multiple items by 'pointing at yourself'. Example:

{
"operation": "DELETE",
"source": "my.Entity",
"target": "my.Entity",
"query": "byFkId",
"arguments": {
"idArg": "?"
}
}

Will expand to SQL for a supported Dataset backing location / storage engine of:

DELETE
FROM MY_ENTITY
WHERE ID IN (SELECT ID FROM MY_ENTITY WHERE FK_ID = ?)

Refresh action

Non-trivial materialized views must be refreshed to be able to see changes in their input, this action provides this functionality, either for the whole view, or some pre-defined subset.

Syntax

{
"operation": "REFRESH",
"target": "name",
"query": "name",
"arguments": "map of string, any"
}
  • target - Fully-qualified name of the view schema to refresh
  • query - The (optional) name of a query defined on the source type to act as a filter
  • arguments - Key-value arguments to the query, if specified (optional)

Example

Given the following schemas:

petstore.DogChange:
properties:
status:
type: string
name:
type: string
queries:
withStatus:
arguments:
- name: inputStatus
type: string
expression: status == inputStatus

petstore.DogChangeStats:
type: view
materialized: true
sql: SELECT STATUS AS ID, COUNT(*) AS TOTAL FROM @{dogChange} GROUP BY STATUS
using:
dogChange: petstore.DogChange
primaryKey:
- id
properties:
id:
type: string
total:
type: integer

And the following existing data:

petstore.DogChange:

idstatusname
dog1readyScooby
dog2readyLassie
dog3pendingClifford

petstore.DogChangeStats

idtotal
ready5
pending4

Then executing the following transaction:

{
"actions": [
{
"operation": "REFRESH",
"target": "petstore.DogChangeStats",
"query": "withStatus",
"arguments": {
"inputStatus": "ready"
}
}
]
}

Will result in the following:

idtotal
ready2
pending4

The pending total is not yet updated, because the query only targets the ready status, to update the whole view, you could execute the following transaction to refresh all view rows:

{
"actions": [
{
"operation": "REFRESH",
"target": "petstore.DogChangeStats"
}
]
}

Merge action

The merge action takes data from some source (object or view schema), and merges it into a target object based on a named query, it is very similar to an upsert from another schema, except that rows that match the query in the target but do not exist when that same filter is applied to the target are deleted from the target.

Version history is preserved for all object schemas, so this functionality may be used to maintain the history of views.

Syntax

{
"operation": "MERGE",
"source": "name",
"target": "name",
"query": "name",
"arguments": "map of string, any"
}
  • target - Fully-qualified name of the object schema to merge to
  • source - Fully-qualified name of the object or view schema to merge from
  • query - The (optional) name of a query defined on the source schema to act as a filter
  • arguments - Key-value arguments to the query, if specified (optional)
  • sourceDatasetId - If specified the dataset id that owns the source schema (optional)

Note: The source schema must include a unique id property to able to be used in a merge action.

Example

Given the following schemas:

petstore.DogChangeStats:
type: view
materialized: true
sql: SELECT STATUS AS ID, COUNT(*) AS TOTAL FROM @{dogChange} GROUP BY STATUS
using:
dogChange: petstore.DogChange
primaryKey:
- id
properties:
id:
type: string
total:
type: integer

petstore.DogChangeStatsAudit:
type: object
properties:
id:
type: string
total:
type: integer

And the following existing data:

petstore.DogChangeStats

idtotal
ready5

petstore.DogChangeStatsAudit

idversiontotal
ready12
pending15

Then executing the following transaction:

{
"actions": [
{
"operation": "MERGE",
"source": "petstore.DogChangeStats",
"target": "petstore.DogChangeStatsAudit"
}
]
}

Will result in the following data:

petstore.DogChangeStatsAudit

idversiontotal
ready25

That is, the ready record is updated and a new version is stored, but the pending record is removed because it did not exist in the source. A named query may also be used to target only a specific subset of the data in the source and target.

When the source schema exists in the same dataset as the target schema or in a dependency dataset of the target schema dataset, there is no need to specify sourceDatasetId, otherwise, sourceDatasetId can be the id or alias of dataset in the environment.

Replace action

The replace action is similar to the merge action, but optimised for partitioned updates of batch object data. To achieve the performance optimisation, it loses the ability to individually version each row and has other small caveats.

The replace action takes data from some source (object or view schema), and replaces with it any corresponding entries in the target object. The action is normally scoped by some batch ID, which is available on every record in the source and is part of the data logical ID (or it can be ensured that each entry ID belongs to just a single batch).

The replacement creates tombstone history entries for each record that exists in the target, but no longer exists in the source. The operation then deletes all records matching the batch ID, and inserts all records from the source (which is also scoped by the same batch ID).

The operation handles versioning/history in a batch-update manner:

  • The version of the replaced entries is always increased, even if the data in target and source is identical.
  • The next version is taken as the maximum of the existing version +10.
    • It is necessary to accommodate for partial tombstone entries as written by an earlier merge action (and improves readability vs +2 bump).
    • If history is disabled, the next version is just the maximum of the existing version +1.
  • The updated timestamp is always set to the transaction time.
  • The created timestamp is taken as the minimum of created value for the current entries in the target for the given batch.

These versioning "shortcuts" allow the replace action to be implemented without checks/joins for individual entries, thus achieving much better performance, while still satisfying the business requirements for batch-updated data.

Syntax

{
"operation": "REPLACE",
"source": "name",
"target": "name",
"query": "name",
"arguments": "map of string, any"
}
  • target - Fully-qualified name of the object schema to replace entries in
  • source - Fully-qualified name of the object or view schema to replace entries from
  • query - The name of a query defined on the source schema to act as a filter. Optional, but highly recommended to indicate the partition batch scope as otherwise the whole table will be replaced.
  • arguments - Key-value arguments to the query, if specified (optional)
  • sourceDatasetId - If specified the dataset id that owns the source schema (optional)

Note: The source schema must include a unique id property to able to be used in a replace action. The query arguments must partition the data in batches where a single id belongs to a single batch.

Example

Given the schemas:

petstore.DogWithYear:
type: object
properties:
name:
type: string
yearOfBirth:
type: integer
required: true


petstore.AdoptedDogs:
type: view
sql: 'SELECT ID, NAME, YEAR_OF_BIRTH, ''adopted'' AS STATUS FROM @{dog}'
using:
dog: petstore.DogWithYear
properties:
id:
type: string
required: true
name:
type: string
yearOfBirth:
type: integer
required: true
status:
type: string
queries:
byYear:
arguments:
- name: inputYear
type: integer
expression: yearOfBirth == inputYear

petstore.DogYearStatus:
type: object
properties:
name:
type: string
yearOfBirth:
type: integer
required: true
status:
type: string

And the following existing data:

petstore.DogWithYear

idnameyearOfBirth
1Toodles2021
2Geoff2020
3Minnie2021
4Jimmie2021

petstore.DogYearStatus

idnameyearOfBirthstatusversioncreatedupdated
1Toodles2021arrived12023-01-012023-01-01
2Geoff2020arrived12023-01-012023-01-01
4Jimmie2021adopted22023-01-022023-01-02
5Raffles2021unknown12023-01-022023-01-02

Then executing the following transaction:

{
"actions": [
{
"operation": "REPLACE",
"target": "petstore.DogYearStatus",
"source": "petstore.AdoptedDogs",
"query": "byYear",
"arguments": {
"inputYear": 2021
}
}
]
}

Will result in the following:

petstore.DogYearStatus

idnameyearOfBirthstatusversioncreatedupdated
1Toodles2021adopted112023-01-01now
2Geoff2020arrived12023-01-01now
3Minnie2021adopted112023-01-01now
4Jimmie2021adopted112023-01-01now

Note the changed created timestamp and uniform version/updated bump. The newly created Minnie entry inherits the min created timestamp of the whole batch.

Furthermore, the tombstone history entry for the deleted Raffles is created with the original data:

idnameyearOfBirthstatusversioncreatedupdated__deleted
5Raffles2021unknown112023-01-02nowtrue

Insert ignore action

The insert ignore action has the same syntax and usage as the UPSERT action, however if data already exists with the same id in the target, it will not be replaced.

Syntax

{
"operation": "INSERT_IGNORE",
"locationUri": "string",
"format": "string"
}

or

{
"operation": "INSERT_IGNORE",
"source": "name",
"target": "name",
"query": "string",
"arguments": "map of string, any"
}

Available configuration properties are the same as for upsert from S3 location and upsert from other schema respectively.

Example

With the following schema:

petstore.Dog:
properties:
name:
type: string

petstore.DogChange:
properties:
name:
type: string

And the following existing data:

petstore.Dog

idversionname
dog11Scooby Doo

petstore.DogChange

idname
dog1Scooby
dog2Lassie
dog3Clifford

Then executing the following transaction:

{
"actions": [
{
"operation": "INSERT_IGNORE",
"target": "petstore.Dog",
"source": "petstore.DogChange"
}
]
}

Will result in the following:

idversionname
dog11Scooby Doo
dog21Lassie
dog31Clifford

Insert record action

The insert records action has the same syntax and usage as the UPSERT action, however only target can be a record schema. Such transaction is very fast, as it copies/inserts directly into a target, however it doesn't add any additional meta fields or supports expression fields.

Syntax

{
"operation": "INSERT_RECORD",
"locationUri": "string",
"format": "string"
}

Delete record action

The delete records action is unique action that takes record schema as a target and named query must be provided with its arguments(if required)

Syntax

{
"operation": "DELETE_RECORD",
"target": "string",
"query": "string",
"arguments": {
"Optional": "map"
}
}

The delete records action currently does not support sourcing records to be deleted from a file or from another table or view. Contributions welcome

Using queries in transactions

It is possible to define complex queries with argument replacement at arbitrary points in the query (rather than just in a synthetic where clause). These queries can be used anywhere a named query can be used.

Example

With the following schema:

petstore.Dog:
properties:
name:
type: string

petstore.DogQuery:
type: query
sql: SELECT * FROM @{dog} WHERE name=${name}
using:
dog: petstore.Dog
arguments:
- name: name
type: string
properties:
name:
type: string

And the following existing data:

petstore.Dog

idversionname
dog11Scooby Doo
dog21Lassie

Then executing the following transaction:

{
"actions": [
{
"operation": "UPSERT",
"target": "petstore.Dog",
"source": "petstore.DogQuery",
"arguments": {
"name": "Lassie"
}
}
]
}

Will result in the following:

idversionname
dog21Lassie

Formats

The following input formats are supported:

JSON

Input data is in JSON-lines format. See: jsonlines.org.

The default columnFormat for this format is AS_SPECIFIED.

PARQUET

Input data is in Parquet files, the locationUri in this case should point to a parent folder, and all files in that folder will be merged.

The default columnFormat for this format is AS_SPECIFIED.

DMS

Input data is in Parquet DMS files, in this case the Op column gets special handling, and when Op = 'D' for a given record, the target row is deleted.

The default columnFormat for this format is UPPER.

Column formats

columnFormat allows enforcing a specific case for field names in the data file - if data fields do not match the specified format, they will be ignored. The following column formats are supported:

note

When running tests with local files, these rules are relaxed, and any variant of field name case in the input is supported as long as it is not ambiguous.

AS_SPECIFIED

Data in the input location should have field names that match the field names in the schema

myField in the schema should be myField in the input data.

UPPER

Data in the input location should have field names that are uppercase variants of the field names in the schema

myField in the schema should be MYFIELD in the input data.

LOWER

Data in the input location should have field names that are lowercase variants of the field names in the schema

myField in the schema should be myfield in the input data.

UPPER_SNAKE

Data in the input location should have field names that are upper snake case variants of the field names in the schema

myField in the schema should be MY_FIELD in the input data.

LOWER_SNAKE

Data in the input location should have field names that are lower snake case variants of the field names in the schema

myField in the schema should be my_field in the input data.

UPPER_CAMEL

Data in the input location should have field names that are upper camel case variants of the field names in the schema

myField in the schema should be MyField in the input data.

LOWER_CAMEL

Data in the input location should have field names that are lower camel case variants of the field names in the schema

MyField in the schema should be myField in the input data.

Column mappings

columnMapping provides ability to map data file field names to table columns. This can be used in cases when some fields don't match provided case or when changing field name during operation.

i.e.

{
"columnMapping": {
"myField": "FIELD_NAME",
"secondField": "oP"
}
}

If column mapping provided in example above would be passed in transaction action that loads data from file, it will look for FIELD_NAME in data file and map it to myField property and will look for oP in data file and map it to secondField. All other fields be looked up by provided columnFormat.

Implicit, Sequence and Expression-defined id fields

Depot supports object schemas where the id field is one of:

  • a string (the default), which Depot populates with a UUID when feasible
  • a value pulled from a sequence
  • a value computed as an expression from other fields

Depot's transaction engine currently has the following limitations:

  • UUID generation 'missing' id fields is currently implemented in the Unitary API engine (REST, GQL, Lambda) but is not implemented in transactions; source provider is responsible for populating all id fields.
    • there is a ticket for implementing this feature in the transaction engine, booked as DPT-2481
  • Computing an expression based on other fields works thoroughly in the Unitary API engine (REST, GQL, Lambda), but in the Transaction engine, it is limited to the capabilities of Basestar expression to SQL expression translation.
    • DPT-2481 suggests an approach to broaden translations using system UDFs, this will require further ticketing and implementation.

Furthermore:

  • Pulling from a sequence to populate an id field may produce suprising results in the MERGE and DELETEoperations, due to the interaction between the MERGE and DELETE semantics and the fact that the sequence values are not known in advance