Skip to content

conduitio-labs/conduit-connector-sql-server

Repository files navigation

Conduit Connector MICROSOFT SQL SERVER

General

The SQL SERVER connector is one of Conduit plugins. It provides both, a source and a destination SQL SERVER connector.

Prerequisites

How to build it

Run make build.

Testing

Run make test to run all the unit and integration tests.

Destination

The SQL Server Destination takes a sdk.Record and parses it into a valid SQL query.

Configuration Options

Name Description Required Example
connection String line for connection to SQL SERVER. More information about it Connection true sqlserver://sa:password@0.0.0.0?database=mydb
table The name of a table in the database that the connector should write to, by default. true users

Table name

If a record contains a sqlserver.table property in its metadata it will be inserted in that table, otherwise it will fall back to use the table configured in the connector. Thus, a Destination can support multiple tables in a single connector, as long as the user has proper access to those tables.

Source

The source connects to the database using the provided connection and starts creating records for each table row and each detected change.

Configuration options

Name Description Required Example
connection String line for connection to SQL SERVER. More information about it Connection true sqlserver://sa:password@0.0.0.0?database=mydb
table The name of a table in the database that the connector should write to, by default. true users
primaryKey Column name that records should use for their Key fields. true id
orderingColumn The name of a column that the connector will use for ordering rows. Its values must be unique and suitable for sorting, otherwise, the snapshot won't work correctly. true id
column Comma separated list of column names that should be included in each Record's payload. If the field is not empty it must contain values of the primaryKey and orderingColumn fields. By default: all rows false id,name,age
snapshot Whether or not the plugin will take a snapshot of the entire table before starting cdc mode, by default true. false false
batchSize Size of rows batch. By default is 1000 false 100

Snapshot

When the connector first starts, snapshot mode is enabled.

First time when the snapshot iterator starts work, it is get max value from orderingColumn and saves this value to position. The snapshot iterator reads all rows, where orderingColumn values less or equal maxValue, from the table in batches.

Values in the ordering column must be unique and suitable for sorting, otherwise, the snapshot won't work correctly. Iterators saves last processed value from orderingColumn column to position to field SnapshotLastProcessedVal. If snapshot stops it will parse position from last record and will try gets row where {{orderingColumn}} > {{position.SnapshotLastProcessedVal}}

When all records are returned, the connector switches to the CDC iterator.

This behavior is enabled by default, but can be turned off by adding "snapshot":"false" to the Source configuration.

Change Data Capture (CDC)

This connector implements CDC features for DB2 by adding a tracking table and triggers to populate it. The tracking table has the same name as a target table with the prefix CONDUIT_TRACKING_. The tracking table has all the same columns as the target table plus three additional columns:

name description
CONDUIT_TRACKING_ID Autoincrement index for the position.
CONDUIT_OPERATION_TYPE Operation type: insert, update, or delete.
CONDUIT_TRACKING_CREATED_DATE Date when the event was added to the tacking table.

Triggers have name pattern CONDUIT_TRIGGER_{{operation_type}}_{{table}}.

Queries to retrieve change data from a tracking table are very similar to queries in a Snapshot iterator, but with CONDUIT_TRACKING_ID ordering column.

CDC iterator periodically clears rows which were successfully applied from tracking table. It collects CONDUIT_TRACKING_ID inside the Ack method into a batch and clears the tracking table every 5 seconds.

Iterator saves the last CONDUIT_TRACKING_ID to the position from the last successfully recorded row.

If connector stops, it will parse position from the last record and will try to get row where {{CONDUIT_TRACKING_ID}} > {{position.CDCLastID}}.

CDC FAQ

Is it possible to add/remove/rename column to table?

Yes. You have to stop the pipeline and do the same with conduit tracking table. For example:

ALTER TABLE CLIENTS
ADD COLUMN address VARCHAR(18);

ALTER TABLE CONDUIT_TRACKING_CLIENTS
    ADD COLUMN address VARCHAR(18);

I accidentally removed tracking table.

You have to restart pipeline, tracking table will be recreated by connector.

I accidentally removed table.

You have to stop the pipeline, remove the conduit tracking table, and then start the pipeline.

Is it possible to change table name?

Yes. Stop the pipeline, change the value of the table in the Source configuration, change the name of the tracking table using a pattern CONDUIT_TRACKING_{{TABLE}}