Skip to content

Write SQL queries

Transform data from the CDF staging area into a data model using built-in and custom Spark SQL queries. Select Switch to SQL editor on the Transform data page to create a transformation in Spark SQL. This article describes the queries and explains how you can load data incrementally.

:::tip Tip The SQL editor offers built-in code completion and built-in Spark SQL functions and Cognite custom SQL functions. :::

:::caution Note Your changes won't be kept if you switch from the SQL editor to the mapping editor. :::

Read from a CDF staging table

To select data from a CDF staging table, use the syntax mydb.mytable:

```sql showLineNumbers select * from database-name.table-name

If your database or table name contains special characters, enclose the name in backticks, for example `` `my-db`.`my table` ``.

### Avoid schema inference

Transformations infer schemas in the CDF staging table, but this process only uses a subset of all the rows in the table. You can avoid schema inference and write a schema fitted your data.

To avoid schema inference:

```sql showLineNumbers
  select
   *
  from
   cdf_raw("database-name", "table-name")

This returns data with the schema key:STRING, lastUpdatedTime:TIMESTAMP, columns:STRING, where the columns string contains the JSON value encoded as a string.

Here's an example of how to enforce a user-defined schema:

```sql showLineNumbers select

 get_json_object(columns, '$.externalId') AS externalId,

 timestamp(get_json_object(columns, '$.timestamp')) AS timestamp,

 double(get_json_object(columns, '$.value')) AS value

from
 cdf_raw("database-name", "table-name")

```

Read from other CDF resource types

To select other CDF resource types, use the syntax _cdf.resource_type. sql showLineNumbers select * from _cdf.events

The supported resource types are:

  • _cdf.events
  • _cdf.assets
  • _cdf.files
  • _cdf.timeseries
  • _cdf.sequences
  • _cdf_sequences.<sequence_externalId>
  • _cdf.datapoints
  • _cdf.stringdatapoints
  • _cdf.labels
  • _cdf.relationships

Load data incrementally

When reading from staging tables, you probably want to transform only the data that has changed since the last transformation job ran. To achieve this, you can filter on the lastUpdatedTime column to query for the rows that have changed after a specific timestamp. When filtering on lastUpdatedTime, the filter is pushed down to the RAW service itself, so this query can be performed efficiently. For example: select * from mydb.mytable where lastUpdatedTime > to_timestamp(123456).

Instead of encoding the timestamp directly in the query and manually keeping it up to date every time new data has been processed, you can use the is_new function. This function returns true when a row has changed since the last time the transformation was run and false otherwise.

The first time you run a transformation using the query below, all the rows of mytable will be processed:

```sql showLineNumbers select * from mydb.mytable where is_new("mydb_mytable", lastUpdatedTime)

If the transformation completes successfully, the **second** run will only process rows that have changed since the first run.

If the transformation fails, `is_new` filters the same rows the next time the transformation is run. This ensures that there is no data loss in the transformation from source to destination.

:::info note
Incremental load is disabled when previewing query results. That is, `is_new` will always return `true` for all rows.
:::

Each `is_new` filter is identified by a name (for example,`"mydb_mytable"`) and can be set to any constant string. This allows you to differentiate between multiple calls to `is_new` in the same query and use `is_new` to filter on multiple tables. To easily identify the different filters, we recommend that you use the name of the table as the name of the `is_new` filter.

## Backfill

To process all the data even if it hasn't changed since the last transformation, change the name of the `is_new` filter, for example, by adding a postfix with an incrementing number (e.g. `"mydb_mytable_1"`).

This is especially useful when the logic of the query changes and data that has already been imported needs to be updated accordingly.

## Custom SQL functions

In addition to the built-in [Spark SQL functions](https://spark.apache.org/docs/2.4.0/api/sql), we also provide a set of custom SQL functions to help you write efficient transformations.

:::info note
When a function expects `var_args`, it allows a variable number of arguments of any type, including star `*`.
:::

### get_names

- **get_names(var_args): Array[String]**

Returns an array of the field names of a struct or row.

**Example**

```sql showLineNumbers
select get_names(*) from mydb.mytable
-- Returns the column names of 'mydb.mytable'

```sql showLineNumbers select get_names(some_struct.*) from mydb.mytable -- Returns the field names of 'some_struct'

### cast_to_strings

- **cast_to_strings(var_args): Array[String]**

Casts the arguments to an array of strings. It handles array, struct and map types by casting it to JSON strings.

**Example**

```sql showLineNumbers
select cast_to_strings(*) from mydb.mytable
-- Returns the values of all columns in 'mydb.mytable' as strings

to_metadata

  • to_metadata(var_args): Map[String, String]

Creates metadata compatible type from the arguments. In practice it does map_from_arrays(get_names(var_args), cast_to_strings(var_args)). Use this function when you want to transform your columns or structures into a format that fits the metadata field in CDF.

Example

```sql showLineNumbers select to_metadata(*) from mydb.mytable -- Creates a metadata structure from all the columns found in 'mydb.mytable'

### to_metadata_except

- **to_metadata_except(excludeFilter: Array[String], var_args)**

Returns a metadata structure (`Map[String, String]`) where strings found in `excludeFilter` will exclude keys from `var_args`.

Use this function when you want to put most, but not all, columns into metadata, for example `to_metadata_except(array("someColumnToExclude"), *)`

**Example**

```sql showLineNumbers
select to_metadata_except(array("myCol"), myCol, testCol) from mydb.mytable
-- Creates a map where myCol is filtered out.
-- The result in this case will be Map("testCol" -> testCol.value.toString)

asset_ids

Attempts to find asset names under the given criteria and return the IDs of the matching assets. Three variations are available.

Attempts to find given assetNames in all assets.

  • asset_ids(assetNames: Array[String]): Array[BigInt]

Attempts to find assetNames in the asset hierarchy with rootAssetName as their root asset.

  • asset_ids(assetNames: Array[String], rootAssetName: String): Array[BigInt]

Attempts to find assetNames that belong to the datasetIds.

  • asset_ids(assetNames: Array[String], datasetIds: Array[Long]): Array[BigInt]

Attempts to find assetNames that belong to the datasetIds under the rootAssetName.

  • asset_ids(assetNames: Array[String], rootAssetName: String, datasetIds: Array[Long]): Array[BigInt]

See Assets for more information about assets in CDF.

:::caution important The entire job will be aborted if asset_ids() did not find any matching assets. :::

Example

```sql showLineNumbers select asset_ids(array("PV10", "PV11")) select asset_ids(array("PV10", "PV11"), "MyBoat") select asset_ids(array("PV10", "PV11"), array(254343, 23433, 54343)) select asset_ids(array("PV10", "PV11"), array(dataset_id("pv-254343-ext-id"), 23433, 54343)) select asset_ids(array("PV10", "PV11"), "MyBoat", array(dataset_id("pv-254343-ext-id"), 23433, 54343))

### is_new

- **is_new(name: String, version: long)**

Returns `true` if the version provided is higher than the version found with the specified name, based on the last time the transformation was run. version can be any column of dataype `long` with only incremental values ingested. A popular example is the `lastUpdatedTime` column.

- If the transformation completes successfully, the next transformation job only processes rows that have changed since the start of the last successfully completed transformation job.

- If the transformation fails, `is_new` processes all rows that have changed since the start of the last successful run. This ensures no data loss in the transformation from source to destination. See also [Load data incrementally](#load-data-incrementally).

:::tip Tip

If you're using more than one occurrence of `is_new()` in one transformation, we recommend that you use different variable names. This guarantees that subqueries within one transformation don't override the `lastUpdatedTime` record before the transformation is completed.
:::

**Example**

```sql showLineNumbers
select * from mydb.mytable where is_new("mydb_mytable_version", lastUpdatedTime)
-- Returns only rows that have changed since the last successful run

dataset_id

  • dataset_id(externalId: String): BigInt

Attempts to find the id of the given data set by externalId and returns the id if the externalId exists.

Example

```sql showLineNumbers select dataset_id("EXAMPLE_DATASET") as dataSetId

### cdf_assetSubtree

- **cdf_assetSubtree(externalId: String or id: BigInt): Table[Asset]**

Returns an asset subtree under a specific asset in an asset hierarchy, that is, all the child assets for a specific asset in an asset hierarchy are returned.

:::caution important
If the total size of subtree exceeds 100,000 assets, an error will be returned.
:::

**Example**

```sql showLineNumbers
select * from cdf_assetSubtree('externalId of an asset')
select * from cdf_assetSubtree('id of an asset')

cdf_nodes

  • cdf_nodes(space of the view: String, externalId of the view: String, version of the view: String): Table[Nodes]
  • cdf_nodes(): Table[Nodes]

Returns nodes in the CDF project as a table.

  • cdf_nodes() returns space and externalId of all nodes in the CDF project.
  • cdf_nodes("space of the view: String", "externalId of the view: String"," version of the view: String") returns a table with nodes ingested with view as reference.

    The table contains space and externalId columns and columns for each property in the view.

Example

```sql showLineNumbers select * from cdf_nodes('space of the view: String', 'externalId of the view: String', 'version of the view: String') select * from cdf_nodes()

### cdf_edges

- **cdf_edges("space of the view: String", "externalId of the view: String", "version of the view: String"): Table[Edges]**
- **cdf_edges(): Table[Edges]**

Returns edges in the CDF project as a table.

- `cdf_edges()` returns `space`, `externalId`, `startNode`, `endNode`, and `type` of all edges in a CDF project.

- `cdf_edges(space of the view: String, externalId of the view: String, version of the view: String)` returns a table with edges ingested with `view` as reference. <br></br>
  The table contains `space`, `externalId`, `startNode`, `endNode`, and `type` columns and columns for each property in the `view`.

**Example**

```sql showLineNumbers
select * from cdf_edges('space of the view: String', 'externalId of the view: String', 'version of the view: String')
select * from cdf_edges()

node_reference

  • node_reference("space: String", "externalId: String"): STRUCT<"space:string", "externalId:string">
  • node_reference("externalId: String"): STRUCT<"space:String", "externalId:String">

To reference a node, you need the space externalId of the node and the node externalId. Typically, you reference a node when writing or filtering edges based on startNode and endNode.

node_reference accepts the single parameter externalId of the node. The target/instance space set at the transformation is used as the space externalId of the node.

:::caution important If you're using node_reference for filtering i.e. in your where clause, you must add the space externalId and the node externalId. :::

Example

```sql showLineNumbers select node_reference('space externalId of a node', 'externalId of a node') as startNode, node_reference('space externalId of a node', 'externalId of a node') as endNode, ... from mydb.mytable select node_reference('externalId of a node') as startNode, node_reference('externalId of a node') as endNode, ... from mydb.mytable select * from cdf_edges('space of the view: String', 'externalId of the view: String', 'version of the view: String') where startNode = node_reference('space externalId of a node', 'externalId of a node') or node_reference('space externalId of a node', 'externalId of a node')

### type_reference

- **type_reference("space: String", "externalId: String"): STRUCT&lt;"space:String", "externalId:String"&gt;**
- **type_reference("externalId: String"): STRUCT&lt;"space:String", "externalId:String"&gt;**

All edges have `type`. To filter edges based on `type`, use `type_reference` and provide the _space_ `externalId` and the _edge type_ `externalId`. If you're writing edges with a `view` reference, you must specify the edge type using `type_reference`.

`type_reference` accepts the single parameter `externalId` of the edge type. The target/instance space set at the transformation is used as the _space_ `externalId` of the edge type.

:::caution important
If you're using `type_reference` for filtering i.e. in your `where` clause, you must add the _space_ `externalId` and the _edge type_ `externalId`.
:::

**Example**

```sql showLineNumbers
select node_reference('space externalId of a node', 'externalId of a node') as startNode, type_reference('space externalId of a node', 'externalId of a node') as endNode, ... from mydb.mytable
select * from cdf_edges('space of the view: String', 'externalId of the view: String', 'version of the view: String') where type = type_reference('space externalId of a node', 'externalId of a node') or type_reference('space externalId of a node', 'externalId of a node')
select * from cdf_edges() where type = type_reference('space externalId of a node', 'externalId of a node') or type_reference('space externalId of a node', 'externalId of a node')

cdf_data_models

  • cdf_data_models("data model space: String", "data model externalId: String", "data model version: String", "type external id: String" ): Table[Nodes]
  • cdf_data_models("data model space: String", "data model externalId: String", "data model version: String", "type external id: String", "property in type containing the relationship: String" ): Table[Edges]

These functions follow the data model UI lingo and make it easy to retrieve the data written to types and relationship.

To retrieve data from a type in your data model, provide the data model's space, externalId, version and the externalId of the type as input parameters to cdf_data_models.

To retrieve data from a relationship in your data model, provide the data model's space, externalId, version,the externalId of the type containing the relationship and the name of the relationship property in the type as input parameters to cdf_data_models.

Example

sql showLineNumbers select * from cdf_data_models('data model space: String', 'data model externalId: String', 'data model version: String', 'type external id: String') select * from cdf_data_models('data model space: String', 'data model externalId: String', 'data model version: String', 'type external id: String', 'property in type where relationship is defined: String')

Disabled Spark SQL functions

We currently don't support using these Spark SQL functions when you transform data:

xpath

xpath_boolean

xpath_double

xpath_float

xpath_int

xpath_number

xpath_short

xpath_string

xpath_long

java_method

reflect