Skip to main content

Depot Transactions from CDK processes

The easiest way to run a Depot Transaction is from a process that is itself an AWS Step Function, deployed via CDK.

The following example will guide you through testing and deploying a tested process expressed as a Depot Transaction:

In your schema package

note

this example is based off the schemas and examples described in transactions.

  1. In your schema package, next to your schemas, create a file named package-transactions.ts with the following contents:
import { TransactionRequest } from "@stage-tech/depot-schema";

export class PetSchemaTransactions {

/**
* Ingests (or updates) a set of dogs provided in JSON format on S3 from `locationUri`
* into the main into the main `petstore.Dog` table.
*/
public static ingestDogs(locationUri: string): TransactionRequest.Action[] {
return [
{
operation: TransactionRequest.Operation.UPSERT,
locationUri: locationUri,
format: TransactionRequest.Format.JSON
}
]
}

/**
* Inserts the dogs from `petstore.DogChange` that are ready into the main `petstore.Dog` table.
*/
public static insertReady(): TransactionRequest.Action[] {
return [
{
operation: TransactionRequest.Operation.UPSERT,
target: "petstore.Dog",
source: "petstore.DogChange",
query: "withStatus",
arguments: {
inputStatus: "ready"
}
}
];
}
}
  1. Test your transactions using Depot Test:

const petSchema: MergedSchemas = { /* ... */};

describe("Pet Schema transactions", () => {
it.should("successfully insert ready dogs", async () => {

const scenario = DepotTest.in(petSchema)
.setContent({ /* .... */})
.transaction(PetSchemaTransactions.insertReady())
.check({ /* ... */});

await scenario.run();
});
});

The Action[] that are returned by the various transaction templates from PetSchemaTransactions are directly fed to the DepotTest.transaction() primitive.

In your CDK package

import {IStageDepotEnvironment} from "@stage-tech/depot-cdk";
import * as s3 from "@aws-cdk/aws-s3";
import * as sfn from "@aws-cdk/aws-stepfunctions";

export class MyProcess extends Construct {
constructor(scope: Construct, id: string, readonly props: MyProcessProps) {
super(scope, id);

const locationUri = sfn.JsonPath.format("s3://{}/{}",
props.myBucket.bucketName,
sfn.JsonPath.stringAt("$.Input.dogInputJson"));

const ingestJsonTask = props.depot.runTransaction(scope, 'Ingest S3 Dogs', {
dataset: props.petShopDataset,
actions: PetSchemaTransactions.ingestDogs(locationUri)
});
const ingestReadyTask = props.depot.runTransaction(scope, 'Ingest Ready Dogs', {
dataset: props.petShopDataset,
actions: PetSchemaTransactions.ingestReady()
});

const myProcess = new sfn.StateMachine(scope, 'MyProcess', {
stateMachineName: 'my-process',
definitionBody: sfn.Definition.fromChainable(
ingestJsonTask.next(ingestReadyTask)
)
});
}
}

export interface MyProcessProps {
readonly depot: IStageDepotEnvironment;
readonly petShopDataset: string;
readonly myBucket: s3.IBucket;
}

This will create a step function with two consecutive steps, each running a separate Transaction with one Action each. The actual Action[] shapes have been tested using DepotTest and what is deployed is the tested shape.

Snowflake transaction tagging

Tags are passed to Snowflake as key-value pairs and can be used to filter and identify transactions in Snowflake. I.e.:

props.depot.runTransaction(this, 'Run transaction from Data', {
dataset: props.petShopDataset,
actions: PetSchemaTransactions.ingestDogs(locationUri),
tags: {
tagName: 'Tag value',
tag2: 'one more'
},
});