Transactions
A transaction is a bulk data update, consisting of a series of operations performed against a given dataset. A transaction is isolated to a dataset, and either all actions succeed, or all actions fail and no changes are made.
To run a transaction, issue a request to the dataset's transaction endpoint, with the following syntax:
{
"actions": [
"list of action objects"
]
}
Where each action object may be one of the following types:
Upsert action (from S3 location)
This action type upserts (i.e. creates or updates) all object schemas in the dataset with data made available in S3. JSON and Parquet formats are supported.
Syntax
{
"operation": "UPSERT",
"locationUri": "string",
"format": "string"
}
locationUri- S3 path to a folder containing data laid out by schema fully-qualified name.format- input format, see Formats section
In a snowflake transaction, a single snowflake Stage must exist and be accessible, providing access to the specified location, see https://docs.snowflake.com/en/sql-reference/ddl-stage.html#stage-management.
Ideally, the locationUri should be exact, without any trailing slash characters on the end for S3 locations.
Example
Assuming the following schema:
petstore.Dog:
properties:
name:
type: string
And the following pre-existing data:
| id | version | name |
|---|---|---|
| dog1 | 1 | Scooby Doo |
And the following data in the S3 location s3://petstore/ingest1/petstore/Dog/anyfile.json
{
"id": "dog1",
"name": "Scooby"
}
{
"id": "dog2",
"name": "Lassie"
}
{
"id": "dog3",
"name": "Clifford"
}
Then executing the following transaction:
{
"actions": [
{
"operation": "UPSERT",
"locationUri": "s3://petstore/ingest1",
"format": "JSON"
}
]
}
Will result in the following:
| id | version | name |
|---|---|---|
| dog1 | 2 | Scooby |
| dog2 | 1 | Lassie |
| dog3 | 1 | Clifford |
Upsert action (from table to table)
This action type upserts data directly from one table to another within the same database or account, without requiring intermediate S3 storage. This feature is available for Snowflake and Aurora/PostgreSQL locations.
Syntax
{
"operation": "UPSERT",
"locationUri": "string",
"target": "string"
}
locationUri- URI pointing to the source table:- Snowflake:
snowflake://database.schema.tableorsnowflake://schema.table(uses current database) - Aurora/PostgreSQL:
aurora://schema.tableorpostgres://schema.table
- Snowflake:
target- Fully-qualified name of the target schema (optional, defaults to all schemas in dataset)
- Source and target tables must be in the same database/account
- The target table structure is defined by the LinkableSchema, not inferred from the source
- Column mapping is automatic based on schema property names
- Works with structured properties, arrays, and nested objects (same as S3 import)
Example
Import data from one table to another:
{
"actions": [
{
"operation": "UPSERT",
"locationUri": "snowflake://MY_DB.MY_SCHEMA.SOURCE_TABLE",
"target": "target_schema_name"
}
]
}
Upsert action (from external table)
This action type upserts data from an external table defined in your dataset schema. External tables reference data stored outside the database (e.g., in S3, Azure Blob Storage) but can be queried like regular tables.
Syntax
{
"operation": "UPSERT",
"source": "name",
"target": "name"
}
source- Fully-qualified name of the external table schematarget- Fully-qualified name of the target object schema (optional, defaults to all schemas in dataset)
- The source must be an external table schema defined in your dataset
- Column mapping follows the same rules as other UPSERT operations
- Works with both Snowflake and Aurora/PostgreSQL external table definitions
Example
Import data from an external table:
{
"actions": [
{
"operation": "UPSERT",
"source": "myschema.ExternalSource",
"target": "myschema.TargetObject"
}
]
}
Upsert action (from other schema)
This action is as the upsert from location action, except that only a single object schema is targeted, and data is copied from some other schema (either an object or a view), using a filter pre-defined on that source schema
Syntax
{
"operation": "UPSERT",
"target": "name",
"source": "name",
"query": "name",
"arguments": "map of string, any"
}
target- Fully-qualified name of the object schema to copy tosource- Fully-qualified name of the object or view schema to copy fromquery- The (optional) name of a query defined on the source schema to act as a filterarguments- Key-value arguments to the query, if specified (optional)sourceDatasetId- If specified the dataset id that owns thesourceschema (optional)
Example
With the following schema:
petstore.Dog:
properties:
name:
type: string
petstore.DogChange:
properties:
status:
type: string
name:
type: string
queries:
withStatus:
arguments:
- name: inputStatus
type: string
expression: status == inputStatus
And the following existing data:
petstore.Dog
| id | version | name |
|---|---|---|
| dog1 | 1 | Scooby Doo |
petstore.DogChange
| id | status | name |
|---|---|---|
| dog1 | ready | Scooby |
| dog2 | ready | Lassie |
| dog3 | pending | Clifford |
Then executing the following transaction:
{
"actions": [
{
"operation": "UPSERT",
"target": "petstore.Dog",
"source": "petstore.DogChange",
"query": "withStatus",
"arguments": {
"inputStatus": "ready"
}
}
]
}
Will result in the following:
| id | version | name |
|---|---|---|
| dog1 | 2 | Scooby |
| dog2 | 1 | Lassie |
When the source schema exists in the same dataset as the target schema or in a dependency dataset of the target schema
dataset,
there is no need to specify sourceDatasetId, otherwise, sourceDatasetId can be the id or alias of dataset in the
environment.
Patch action
The PATCH action works similarly to the UPSERT action, except that it only updates existing records, and does not create
new records.
Furthermore, it only updates the columns specified in the input data, leaving all other columns unchanged.
The id field must be present in the source data, and must match an existing record in the target data.
Since Depot 9.9.0, PATCH operations use a changeset-based transaction approach internally, resulting in improved performance for batch updates.
You can use most schema types as a source: object, view, query and also can use file input, provided
the source contains the id field and any columns you wish to alter.
For now it is not possible to specify a list of columns to take into account from the source schema in order to
disregard other columns. Any column mentioned in the source (except version, created, updated, schema and hash
for object sources)
will be used to update the target.
you can achieve the "projection" effect by creating a query schema that selects from your actual source only the
columns you intend to patch, and use that schema as a source.
Example
With the following schema:
petstore.Dog:
type: object
properties:
name:
type: string
age:
type: integer
petstore.NextDogAge:
type: query
properties:
id:
type: string!
age:
type: integer
sql: SELECT id, age + 1 as age FROM DOG WHERE age IS NOT NULL
using:
DOG: petstore.Dog
And the following existing data:
petstore.Dog
| id | version | name | age |
|---|---|---|---|
| dog1 | 1 | Scooby Doo | 5 |
| dog2 | 1 | Médor | 3 |
| dog3 | 1 | Corniaud | null |
Then executing the following transaction:
{
"actions": [
{
"operation": "PATCH",
"target": "petstore.Dog",
"source": "petstore.NextDogAge"
}
]
}
will result in the following data:
| id | version | name | age |
|---|---|---|---|
| dog1 | 2 | Scooby Doo | 6 |
| dog2 | 2 | Médor | 4 |
| dog3 | 1 | Corniaud | null |
Delete action
This action type deletes data from all object schemas where there is a matching id in the input location
Syntax
{
"operation": "DELETE",
"locationUri": "string",
"format": "string"
}
locationUri- S3 path to a folder containing data laid out by schema fully-qualified nameformat- input format, see Formats section
Example
Given the following schema:
petstore.Dog:
properties:
name:
type: string
And the following pre-existing data:
| id | version | name |
|---|---|---|
| dog1 | 2 | Scooby |
| dog2 | 1 | Lassie |
| dog3 | 1 | Clifford |
And the following data in the S3 location s3://petstore/ingest2/petstore/Dog/anyfile.json
{
"id": "dog3"
}
Then executing the following transaction:
{
"actions": [
{
"operation": "DELETE",
"locationUri": "s3://petstore/ingest1",
"format": "JSON"
}
]
}
Will result in the following:
| id | version | name |
|---|---|---|
| dog1 | 2 | Scooby |
| dog2 | 1 | Lassie |
You can also issue a DELETE transaction that deletes multiple items by 'pointing at yourself'. Example:
{
"operation": "DELETE",
"source": "my.Entity",
"target": "my.Entity",
"query": "byFkId",
"arguments": {
"idArg": "?"
}
}
Will expand to SQL for a supported Dataset backing location / storage engine of:
DELETE
FROM MY_ENTITY
WHERE ID IN (SELECT ID FROM MY_ENTITY WHERE FK_ID = ?)
Refresh action
Non-trivial materialized views must be refreshed to be able to see changes in their input, this action provides this functionality, either for the whole view, or some pre-defined subset.
Syntax
{
"operation": "REFRESH",
"target": "name",
"query": "name",
"arguments": "map of string, any"
}
target- Fully-qualified name of the view schema to refreshquery- The (optional) name of a query defined on the source type to act as a filterarguments- Key-value arguments to the query, if specified (optional)
Example
Given the following schemas:
petstore.DogChange:
properties:
status:
type: string
name:
type: string
queries:
withStatus:
arguments:
- name: inputStatus
type: string
expression: status == inputStatus
petstore.DogChangeStats:
type: view
materialized: true
sql: SELECT STATUS AS ID, COUNT(*) AS TOTAL FROM @{dogChange} GROUP BY STATUS
using:
dogChange: petstore.DogChange
primaryKey:
- id
properties:
id:
type: string
total:
type: integer
And the following existing data:
petstore.DogChange:
| id | status | name |
|---|---|---|
| dog1 | ready | Scooby |
| dog2 | ready | Lassie |
| dog3 | pending | Clifford |
petstore.DogChangeStats
| id | total |
|---|---|
| ready | 5 |
| pending | 4 |
Then executing the following transaction:
{
"actions": [
{
"operation": "REFRESH",
"target": "petstore.DogChangeStats",
"query": "withStatus",
"arguments": {
"inputStatus": "ready"
}
}
]
}
Will result in the following:
| id | total |
|---|---|
| ready | 2 |
| pending | 4 |
The pending total is not yet updated, because the query only targets the ready status, to update the whole view,
you could execute the following transaction to refresh all view rows:
{
"actions": [
{
"operation": "REFRESH",
"target": "petstore.DogChangeStats"
}
]
}
Merge action
The merge action takes data from some source (object or view schema), and merges it into a target object based on a named query, it is very similar to an upsert from another schema, except that rows that match the query in the target but do not exist when that same filter is applied to the target are deleted from the target.
Version history is preserved for all object schemas, so this functionality may be used to maintain the history of views.
Syntax
{
"operation": "MERGE",
"source": "name",
"target": "name",
"query": "name",
"arguments": "map of string, any"
}
target- Fully-qualified name of the object schema to merge tosource- Fully-qualified name of the object or view schema to merge fromquery- The (optional) name of a query defined on the source schema to act as a filterarguments- Key-value arguments to the query, if specified (optional)sourceDatasetId- If specified the dataset id that owns thesourceschema (optional)
Note: The source schema must include a unique id property to able to be used in a merge action.
Example
Given the following schemas:
petstore.DogChangeStats:
type: view
materialized: true
sql: SELECT STATUS AS ID, COUNT(*) AS TOTAL FROM @{dogChange} GROUP BY STATUS
using:
dogChange: petstore.DogChange
primaryKey:
- id
properties:
id:
type: string
total:
type: integer
petstore.DogChangeStatsAudit:
type: object
properties:
id:
type: string
total:
type: integer
And the following existing data:
petstore.DogChangeStats
| id | total |
|---|---|
| ready | 5 |
petstore.DogChangeStatsAudit
| id | version | total |
|---|---|---|
| ready | 1 | 2 |
| pending | 1 | 5 |
Then executing the following transaction:
{
"actions": [
{
"operation": "MERGE",
"source": "petstore.DogChangeStats",
"target": "petstore.DogChangeStatsAudit"
}
]
}
Will result in the following data:
petstore.DogChangeStatsAudit
| id | version | total |
|---|---|---|
| ready | 2 | 5 |
That is, the ready record is updated and a new version is stored, but the pending record is removed because it did
not exist in the source. A named query may also be used to target only a specific subset of the data in the source and
target.
When the source schema exists in the same dataset as the target schema or in a dependency dataset of the target schema
dataset,
there is no need to specify sourceDatasetId, otherwise, sourceDatasetId can be the id or alias of dataset in the
environment.
Replace action
The replace action is similar to the merge action, but optimised for partitioned updates of batch object data. To achieve the performance optimisation, it loses the ability to individually version each row and has other small caveats.
The replace action takes data from some source (object or view schema), and replaces with it any corresponding entries in the target object. The action is normally scoped by some batch ID, which is available on every record in the source and is part of the data logical ID (or it can be ensured that each entry ID belongs to just a single batch).
The replacement creates tombstone history entries for each record that exists in the target, but no longer exists in the source. The operation then deletes all records matching the batch ID, and inserts all records from the source (which is also scoped by the same batch ID).
The operation handles versioning/history in a batch-update manner:
- The
versionof the replaced entries is always increased, even if the data in target and source is identical. - The next version is taken as the maximum of the existing version
+10.- It is necessary to accommodate for partial tombstone entries as written by an earlier merge action
(and improves readability vs
+2bump). - If history is disabled, the next version is just the maximum of the existing version
+1.
- It is necessary to accommodate for partial tombstone entries as written by an earlier merge action
(and improves readability vs
- The
updatedtimestamp is always set to the transaction time. - The
createdtimestamp is taken as the minimum ofcreatedvalue for the current entries in the target for the given batch.
These versioning "shortcuts" allow the replace action to be implemented without checks/joins for individual entries, thus achieving much better performance, while still satisfying the business requirements for batch-updated data.
Syntax
{
"operation": "REPLACE",
"source": "name",
"target": "name",
"query": "name",
"arguments": "map of string, any"
}
target- Fully-qualified name of the object schema to replace entries insource- Fully-qualified name of the object or view schema to replace entries fromquery- The name of a query defined on the source schema to act as a filter. Optional, but highly recommended to indicate the partition batch scope as otherwise the whole table will be replaced.arguments- Key-value arguments to the query, if specified (optional)sourceDatasetId- If specified the dataset id that owns thesourceschema (optional)
Note: The source schema must include a unique id property to able to be used in a replace action.
The query arguments must partition the data in batches where a single id belongs to a single batch.
Example
Given the schemas:
petstore.DogWithYear:
type: object
properties:
name:
type: string
yearOfBirth:
type: integer
required: true
petstore.AdoptedDogs:
type: view
sql: 'SELECT ID, NAME, YEAR_OF_BIRTH, ''adopted'' AS STATUS FROM @{dog}'
using:
dog: petstore.DogWithYear
properties:
id:
type: string
required: true
name:
type: string
yearOfBirth:
type: integer
required: true
status:
type: string
queries:
byYear:
arguments:
- name: inputYear
type: integer
expression: yearOfBirth == inputYear
petstore.DogYearStatus:
type: object
properties:
name:
type: string
yearOfBirth:
type: integer
required: true
status:
type: string
And the following existing data:
petstore.DogWithYear
| id | name | yearOfBirth |
|---|---|---|
| 1 | Toodles | 2021 |
| 2 | Geoff | 2020 |
| 3 | Minnie | 2021 |
| 4 | Jimmie | 2021 |
petstore.DogYearStatus
| id | name | yearOfBirth | status | version | created | updated |
|---|---|---|---|---|---|---|
| 1 | Toodles | 2021 | arrived | 1 | 2023-01-01 | 2023-01-01 |
| 2 | Geoff | 2020 | arrived | 1 | 2023-01-01 | 2023-01-01 |
| 4 | Jimmie | 2021 | adopted | 2 | 2023-01-02 | 2023-01-02 |
| 5 | Raffles | 2021 | unknown | 1 | 2023-01-02 | 2023-01-02 |
Then executing the following transaction:
{
"actions": [
{
"operation": "REPLACE",
"target": "petstore.DogYearStatus",
"source": "petstore.AdoptedDogs",
"query": "byYear",
"arguments": {
"inputYear": 2021
}
}
]
}
Will result in the following:
petstore.DogYearStatus
| id | name | yearOfBirth | status | version | created | updated |
|---|---|---|---|---|---|---|
| 1 | Toodles | 2021 | adopted | 11 | 2023-01-01 | now |
| 2 | Geoff | 2020 | arrived | 1 | 2023-01-01 | now |
| 3 | Minnie | 2021 | adopted | 11 | 2023-01-01 | now |
| 4 | Jimmie | 2021 | adopted | 11 | 2023-01-01 | now |
Note the changed created timestamp and uniform version/updated bump. The newly created Minnie entry inherits
the min created timestamp of the whole batch.
Furthermore, the tombstone history entry for the deleted Raffles is created with the original data:
| id | name | yearOfBirth | status | version | created | updated | __deleted |
|---|---|---|---|---|---|---|---|
| 5 | Raffles | 2021 | unknown | 11 | 2023-01-02 | now | true |
Insert ignore action
The insert ignore action has the same syntax and usage as the UPSERT action, however if data already exists with the same id in the target, it will not be replaced.
Syntax
{
"operation": "INSERT_IGNORE",
"locationUri": "string",
"format": "string"
}
or
{
"operation": "INSERT_IGNORE",
"source": "name",
"target": "name",
"query": "string",
"arguments": "map of string, any"
}
Available configuration properties are the same as for upsert from S3 location and upsert from other schema respectively.
Example
With the following schema:
petstore.Dog:
properties:
name:
type: string
petstore.DogChange:
properties:
name:
type: string
And the following existing data:
petstore.Dog
| id | version | name |
|---|---|---|
| dog1 | 1 | Scooby Doo |
petstore.DogChange
| id | name |
|---|---|
| dog1 | Scooby |
| dog2 | Lassie |
| dog3 | Clifford |
Then executing the following transaction:
{
"actions": [
{
"operation": "INSERT_IGNORE",
"target": "petstore.Dog",
"source": "petstore.DogChange"
}
]
}
Will result in the following:
| id | version | name |
|---|---|---|
| dog1 | 1 | Scooby Doo |
| dog2 | 1 | Lassie |
| dog3 | 1 | Clifford |
Insert record action
The insert records action has the same syntax and usage as the UPSERT action, however only target can be a record schema. Such transaction is very fast, as it copies/inserts directly into a target, however it doesn't add any additional meta fields or supports expression fields.
Syntax
{
"operation": "INSERT_RECORD",
"locationUri": "string",
"format": "string"
}
Delete record action
The delete records action is unique action that takes record schema as a target and named query must be provided with its arguments(if required)
Syntax
{
"operation": "DELETE_RECORD",
"target": "string",
"query": "string",
"arguments": {
"Optional": "map"
}
}
The delete records action currently does not support sourcing records to be deleted from a file or from another table or view. Contributions welcome
Using queries in transactions
It is possible to define complex queries with argument replacement at arbitrary points in the query (rather than just in a synthetic where clause). These queries can be used anywhere a named query can be used.
Example
With the following schema:
petstore.Dog:
properties:
name:
type: string
petstore.DogQuery:
type: query
sql: SELECT * FROM @{dog} WHERE name=${name}
using:
dog: petstore.Dog
arguments:
- name: name
type: string
properties:
name:
type: string
And the following existing data:
petstore.Dog
| id | version | name |
|---|---|---|
| dog1 | 1 | Scooby Doo |
| dog2 | 1 | Lassie |
Then executing the following transaction:
{
"actions": [
{
"operation": "UPSERT",
"target": "petstore.Dog",
"source": "petstore.DogQuery",
"arguments": {
"name": "Lassie"
}
}
]
}
Will result in the following:
| id | version | name |
|---|---|---|
| dog2 | 1 | Lassie |
Formats
The following input formats are supported:
JSON
Input data is in JSON-lines format. See: jsonlines.org.
The default columnFormat for this format is AS_SPECIFIED.
PARQUET
Input data is in Parquet files, the locationUri in this case should point to a parent folder, and all files in that folder will be merged.
The default columnFormat for this format is AS_SPECIFIED.
DMS
Input data is in Parquet DMS files, in this case the Op column gets special handling, and when Op = 'D' for a given
record, the target row is deleted.
The default columnFormat for this format is UPPER.
Column formats
columnFormat allows enforcing a specific case for field names in the data file - if data fields do not match the
specified format, they will be ignored.
The following column formats are supported:
When running tests with local files, these rules are relaxed, and any variant of field name case in the input is supported as long as it is not ambiguous.
AS_SPECIFIED
Data in the input location should have field names that match the field names in the schema
myField in the schema should be myField in the input data.
UPPER
Data in the input location should have field names that are uppercase variants of the field names in the schema
myField in the schema should be MYFIELD in the input data.
LOWER
Data in the input location should have field names that are lowercase variants of the field names in the schema
myField in the schema should be myfield in the input data.
UPPER_SNAKE
Data in the input location should have field names that are upper snake case variants of the field names in the schema
myField in the schema should be MY_FIELD in the input data.
LOWER_SNAKE
Data in the input location should have field names that are lower snake case variants of the field names in the schema
myField in the schema should be my_field in the input data.
UPPER_CAMEL
Data in the input location should have field names that are upper camel case variants of the field names in the schema
myField in the schema should be MyField in the input data.
LOWER_CAMEL
Data in the input location should have field names that are lower camel case variants of the field names in the schema
MyField in the schema should be myField in the input data.
Column mappings
columnMapping provides ability to map data file field names to table columns. This can be used in cases when some
fields don't match provided case or when changing field name during operation.
i.e.
{
"columnMapping": {
"myField": "FIELD_NAME",
"secondField": "oP"
}
}
If column mapping provided in example above would be passed in transaction action that loads data from file, it will
look for FIELD_NAME in data file and map it to myField property and will look for oP in data file and map it
to secondField. All other fields be looked up by provided columnFormat.
Implicit, Sequence and Expression-defined id fields
Depot supports object schemas where the id field is one of:
- a string (the default), which Depot populates with a UUID when feasible
- a value pulled from a sequence
- a value computed as an expression from other fields
Depot's transaction engine currently has the following limitations:
- UUID generation 'missing'
idfields is currently implemented in the Unitary API engine (REST, GQL, Lambda) but is not implemented in transactions; source provider is responsible for populating allidfields.- there is a ticket for implementing this feature in the transaction engine, booked as DPT-2481
- Computing an expression based on other fields works thoroughly in the Unitary API engine (REST, GQL, Lambda), but in
the
Transaction engine, it is limited to the capabilities of Basestar expression to SQL expression translation.
- DPT-2481 suggests an approach to broaden translations using system UDFs, this will require further ticketing and implementation.
Furthermore:
- Pulling from a sequence to populate an
idfield may produce suprising results in theMERGEandDELETEoperations, due to the interaction between the MERGE and DELETE semantics and the fact that the sequence values are not known in advance