Saturday, November 19, 2011

Field Level Auditing using Change Data Capture – Part 2

SQL Server 2008 introduced a new auditing tool called Change Data Capture (CDC). Using CDC, it is easy and straight forward to track field level changes.

In the first part of this series, we looked at the basics of Change Data Capture and enabled CDC on a sample database and table. We then looked at some of the CDC functions and stored procedures that can be used to configure and query the change table data. One job created with CDC is a cleanup job to delete data from the change table after a retention period, usually a few days. So unless you disable this cleanup job, we need to move this change table data somewhere more permanent. In this article we will build on some of the stored procedures and functions used in Part 1 to create a simple data warehouse to mart the change data, and then create a report using SSRS to view that data.

Create the warehouse

Start by creating the data warehouse database:

USE [master]
GOCREATE DATABASE [CDCDW] ON  PRIMARY
( NAME = N'CDCDW', FILENAME = N'C:\CDCDW.mdf' , SIZE = 2048KB , MAXSIZE = UNLIMITED, FILEGROWTH = 1024KB )
 LOG ON
( NAME = N'CDCDW_log', FILENAME = N'C:\CDCDW_log.ldf' , SIZE = 1024KB , MAXSIZE = 2048GB , FILEGROWTH = 10%)
GO
 
Next create the tables to hold the warehouse data. Include bits flags to indicate which fields changed for each row and fields to hold the timestamp from the LSN and the user name that changed the record based on last_change_id field in the source table. There will be two tables, stagingCDCContract and factCDCContract. The staging table will be truncated and populated with each SSIS run’s data, and the fact table holds all change table data accumulated over time. It is the fact table that will be used for the SSRS report:

USE [CDCDW]
GOCREATE TABLE [dbo].[stagingCDCContract](
      [__$start_lsn] [binary](10) NOT NULL,    
      [__$seqval] [binary](10) NOT NULL,
      [__$operation] [int] NOT NULL,
      [__$update_mask] [varbinary](128) NULL,
      [id] [int] NOT NULL,
      [name] [varchar](50) NULL,
      [year] [int] NULL,
      [length] [int] NULL,
      [type] [int] NULL,
      [amount] [decimal](18, 2) NULL,
      [last_change_id] [int] NULL,
      [last_change_date] [datetime] NULL,
      [last_change_name] [varchar](100) NULL,  
      [IsNameUpdated] bit null default (0),    
      [IsYearUpdated] bit null default (0),
      [IsLengthUpdated] bit null default (0),
      [IsTypeUpdated] bit null default (0),
      [IsAmountUpdated] bit null default (0),
      [IsLastChangeIdUpdated] bit null default (0)   ) ON [PRIMARY]
GOCREATE TABLE [dbo].[factCDCContract](    
      [__$start_lsn] [binary](10) NOT NULL,    
      [__$seqval] [binary](10) NOT NULL,
      [__$operation] [int] NOT NULL,
      [__$update_mask] [varbinary](128) NULL,
      [id] [int] NOT NULL,
      [name] [varchar](50) NULL,
      [year] [int] NULL,
      [length] [int] NULL,
      [type] [int] NULL,
      [amount] [decimal](18, 2) NULL,
      [last_change_id] [int] NULL,
      [last_change_date] [datetime] NULL,
      [last_change_name] [varchar](100) NULL,  
      [IsNameUpdated] bit null default (0),    
      [IsYearUpdated] bit null default (0),
      [IsLengthUpdated] bit null default (0),
      [IsTypeUpdated] bit null default (0),
      [IsAmountUpdated] bit null default (0),
      [IsLastChangeIdUpdated] bit null default (0)
      CONSTRAINT [PK_factCDCContract] PRIMARY KEY CLUSTERED
(
      [__$start_lsn] ASC,    
      [__$seqval] ASC,
      [__$operation] ASC,    
      [id]
)WITH (PAD_INDEX  = OFF, STATISTICS_NORECOMPUTE  = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS  = ON, ALLOW_PAGE_LOCKS  = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
 
In this example, all the staging fields map directly to the fact table fields. In real life there could be additional mapping to other dimension tables, especially if this data needs to be loaded into a SSAS cube.

We will also create a history table to store the last run time for each change table and initialize it to today’s date:

CREATE TABLE [dbo].[cdcHistory](
      [cdc_table] [varchar](50) NULL,
      [cdc_date] [datetime] NULL
) ON [PRIMARY]
GOinsert into [cdcHistory]
  values ('dbo_contract', 9/1/2011')

 

Add the Stored Procedures

Next we need to create a stored procedure that can be used to pull the change table source data. It uses the same basic syntax as the example in Part 1 that maps the update mask to individual field changes. It also converts the __$start_lsn to a change date and joins to a Users table to get the name of the user who changed the data. This data will be needed for the report.

USE [CDCDemo]
goCREATE PROCEDURE [dbo].[spgGetCDCContract]
 @FromLSNDate as datetime,
 @ToLSNDate as datetime
AS
begin
SET NOCOUNT ON
SET ROWCOUNT 0
declare @from_lsn binary(10)
declare @to_lsn binary(10)
SET @from_lsn = sys.fn_cdc_map_time_to_lsn('largest less than', @FromLSNDate);
SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @ToLSNDate);
if (@from_lsn is not null and @to_lsn is not null and @from_lsn < @to_lsn)
 begin

  DECLARE @name_ordinal int
  DECLARE @year_ordinal int
  DECLARE @length_ordinal int
  DECLARE @type_ordinal int
  DECLARE @amount_ordinal int
  DECLARE @last_change_id_ordinal int

  SET @name_ordinal = sys.fn_cdc_get_column_ordinal('dbo_contract','name')
  SET @year_ordinal = sys.fn_cdc_get_column_ordinal('dbo_contract','year')
  SET @length_ordinal = sys.fn_cdc_get_column_ordinal('dbo_contract','length')
  SET @type_ordinal = sys.fn_cdc_get_column_ordinal('dbo_contract','type')
  SET @amount_ordinal = sys.fn_cdc_get_column_ordinal('dbo_contract','amount')
  SET @last_change_id_ordinal = sys.fn_cdc_get_column_ordinal('dbo_contract','last_change_id')

  SELECT
    cdc.fn_cdc_get_all_changes_dbo_contract.*,
    sys.fn_cdc_map_lsn_to_time(__$start_lsn) as 'last_change_date',
    cast(isnull(user_name, '') as varchar(100)) as 'last_change_name',
    sys.fn_cdc_is_bit_set(@name_ordinal,__$update_mask) as 'IsNameUpdated',
    sys.fn_cdc_is_bit_set(@year_ordinal,__$update_mask) as 'IsYearUpdated',
    sys.fn_cdc_is_bit_set(@length_ordinal,__$update_mask) as 'IsLengthUpdated',
    sys.fn_cdc_is_bit_set(@type_ordinal,__$update_mask) as 'IsTypeUpdated',
    sys.fn_cdc_is_bit_set(@amount_ordinal,__$update_mask) as 'IsAmountUpdated',
    sys.fn_cdc_is_bit_set(@last_change_id_ordinal,__$update_mask) as 'IsLastChangeIdUpdated'
   FROM cdc.fn_cdc_get_all_changes_dbo_contract( @from_lsn, @to_lsn, 'all')
     left join Users 
       on Users.user_id = fn_cdc_get_all_changes_dbo_contract.last_change_id
 end
else
 begin
  -- return empty row
  select
    cdc.dbo_contract_CT.*,
    getdate() as 'last_change_date',
    '' as 'last_change_name',
    cast(0 as bit) as 'IsNameUpdated',
    cast(0 as bit) as 'IsYearUpdated',
    cast(0 as bit) as 'IsLengthUpdated',
    cast(0 as bit) as 'IsTypeUpdated',
    cast(0 as bit) as 'IsAmountUpdated',
    cast(0 as bit) as 'IsLastChangeIdUpdated'
   from cdc.dbo_contract_CT
   where __$start_lsn = 0x00000000000000000000 
 end
end
GO

This stored procedure takes two parameters, a start LSN date and an end LSN date. It uses the function fn_cdc_map_time_to_lsn to map the date to a LSN. The start LSN date will be the last date in the cdcHistory table. The next run should start from here. Create a stored procedure to get that date:

USE [CDCDW]
GOCREATE PROCEDURE [dbo].[spgGetCDCHistoryLastLSNDate]
  @cdcTable as varchar(50)
AS
begin
SET NOCOUNT ON
SET ROWCOUNT 0
select max(cdc_date) as 'LastCDCLSNDate'
 from cdcHistory
 where cdc_table = @cdcTableend
GO
 
The end LSN date will be the maximum date in the lsn_time_mapping table. Use the functions fn_cdc_get_max_lsn and fn_cdc_map_lsn_to_time to get this date. Create another stored procedure to return that date from the CDC source database:

USE [CDCDemo]
goCREATE PROCEDURE [dbo].[spgGetCDCMaxLSNDate]
AS
begin
SET NOCOUNT ON
SET ROWCOUNT 0
declare @maxCDCDate as datetime
declare @to_lsn binary(10)
set @to_lsn = sys.fn_cdc_get_max_lsn();
set @maxCDCDate = sys.fn_cdc_map_lsn_to_time(@to_lsn);
select @maxCDCDate as 'MaxCDCLSNDate'
end
GO
 
Finally, create a stored procedure to set the last run date in the history table when the job completes:

USE [CDCDW]
GOCREATE PROCEDURE [dbo].[spgSetCDCHistory]
 @cdcTable as varchar(50),
 @cdcDate as datetime
AS
begin
SET NOCOUNT ON
SET ROWCOUNT 0
insert into [cdcHistory]
 values(@cdcTable, @cdcDate)
end
GO

Create the package

Now we can create a simple SSIS package that can be used to pull the change data from the change table and insert it into the data warehouse. Start by creating a new SSIS project, a package to hold the tasks, and two data sources, one for the CDC source database and the other for the data warehouse:



Next add a sequence container to the package. Create two variables to hold the stored procedure parameters FromLSNDate and ToLSNDate, making sure to highlight the container so the variable scope is set at the container level. Set the variable Type to DateTime:



Now add the tasks to move the change table data. First add an Execute SQL Task to set the FromLSNDate variable to the last run date. Right click and select Edit. Set ResultSet to Single Row, set Connection to the data warehouse data source, and set SQL Statement to call the stored procedure spgGetCDCHistoryLastLSNDate, passing in the table name as a parameter:

exec [spgGetCDCHistoryLastLSNDate] @cdcTable='dbo_contract'



To assign the return value to the variable, click Result Set and add a Result Name value for the result LastCDCLSNDate returned from the stored procedure and map it to the variable FromLSNDate:



Click Ok to save. Next add a second Execute SQL Task to set the ToLSNDate variable. Right click Edit, set the ResultSet to Single Row, set the Connection to the source database data source, and set the SQL Statement to:

exec [spgGetCDCMaxLSNDate]
 




Click the Result Set option and add a Result Name value for the result MaxCDCLSNDate returned from the stored procedure and map it to the variable ToLSNDate:




 
Click Ok to save. Next add a third Execute SQL task to truncate the table stagingCDCContract so it can be populated with this run’s data:




 
Now we are ready to copy the change table data to the staging table in the data warehouse. Add a Data Flow task, right click Edit and add an OLE DB Source data flow source and an OLE DB Destination data flow destination. Right click Edit on the OLE DB Source, set the connection manager to the CDC database, select SQL Command for the Data access mode, and enter this command to call the stored procedure:

exec [spgGetCDCContract] @FromLSNDate=?, @ToLSNDate=?



 
Click the Parameters button and set the input parameters to the corresponding variables set in the previous tasks:




 
Click Ok to save. On the OLE DB Destination data flow task, set the connection to the data warehouse data source, set the mode to Table or view – fast load, and select the staging table as the name of the table:




 
Click OK to save. Next add another Execute SQL task to copy the staging table data to the fact table. Set the Result Set to None, the connection to the data warehouse data source, and the SQL Statement to:

insert into factCDCContractselect * 
 from stagingCDCContract
 where __$start_lsn not in (select __$start_lsn
                             from factCDCContract)


 
Click OK to save. Finally, add one last Execute SQL task to update the history table with the last run date. Set the Result Set to None, the connection to the data warehouse data source, and the SQL Statement to:

exec [spgSetCDCHistory] @cdcTable='dbo_contract', @cdcDate=?
 



 
Set the Parameter Mapping section to set the stored procedure parameter to the variable ToLSNDate. Set the Data Type to DATE:




 
Click OK to save. You should have a complete package that looks something like this:



But does it work

Run the package by right clicking on the container and select Execute Container. Everything should return green for success:



And looking in the data warehouse factCDCContract table, we can see the results:



This SSIS package can now be scheduled to run nightly to move the CDC data to the data warehouse. The CDC cleanup job will delete data after the retention period to manage the source change data tables and keep them from growing too large.

Create the report

Now that we have marted the change table data, we can create a simple SSRS report that can be used to show field level changes. Create a stored procedure that will pull the warehoused data to display in the report. Create this stored procedure in the data warehouse database adding Name as a parameter:


use [CDCDW]
goCREATE PROCEDURE [dbo].[spgGetCDCContractHistory]
 @Name as varchar(50)
AS
begin
SET NOCOUNT ON
SET ROWCOUNT 0
if (@Name is not null)
 begin
  select
    factCDCContract.id,
    __$seqval,
    factCDCContract.name,
    factCDCContract.year,
    factCDCContract.length,
    factCDCContract.type,
    factCDCContract.amount,
    factCDCContract.last_change_id,
    factCDCContract.last_change_date,
    factCDCContract.last_change_name,
    IsNameUpdated,
    IsYearUpdated,
    IsLengthUpdated,
    IsTypeUpdated,
    IsAmountUpdated,
    IsLastChangeIdUpdated
   from factCDCContract
   where factCDCContract.name = @Name
   order by __$seqval desc
 end
end
GO
Now create a simple SSRS report to pull the change data from the data warehouse and display it in a report. Start by creating a new SSRS project, a report, a shared data source for the data warehouse, a shared dataset for the stored procedure to pull the data, and a parameter called Name:



Next add a report Data Source using the Shared Data Source as the reference:





Add a Data Set using the Shared Dataset for the stored procedure and set the parameter to the Name parameter added earlier:




 
Finally add a Table report item with a column for each source data field in the warehouse change table. Your report should look something like this:



Now let’s add some color. Change the header background color to something besides white and the font weight to Bold. For each of the data fields, add an expression to change the color to Green if the field was updated:




 
Do the same thing for the font weight to change it to Bold if the field was changed:



Now select Preview to see what the results look like:



We now have a simple report that shows the field level changes for a database record over time. While this report is a basic audit report of record changes, the change data can used for a variety of different reports from user productivity to reports on daily or weekly load tracking to better predict resource needs during peak times.

Conclusion

Change Data Capture, as well as Change Tracking, introduce built in database auditing to SQL Server. In these articles we have enabled CDC on a database and table, pulled the change table data into a data warehouse, and created a report to view those changes. This helps deliver a robust auditing solution about what is happening to the data in a database over time. What could be better than that?