CDC - Komple CDC script with Merge ve replike kurma
Definition Description
https://www.youtube.com/watch?v=IJbAhfFRHdA -- Detaylı olan https://www.youtube.com/watch?v=wSQ1GXEwAUQ -- Kısa ve özet https://www.simple-talk.com/sql/learn-sql-server/introduction-to-change-data-capture-(cdc)-in- CDC is two level 1- Önce, üzerinde CDC olan database var mı onu kontrol edelim USE master GO SELECT [name], database_id, is_cdc_enabled FROM sys.databases GO 2- CDC 2 step dir. DB level ve table level
-- Step 1- Database level -- ENABLE FOR DATABASE USE TEST GO EXEC sys.sp_cdc_enable_db -- System Tables altında bazı tablolar oluşturulur cdc.captured_columns -- This table returns result for list of captured column. cdc.change_tables -- This table returns list of all the tables which are enabled for capture. cdc.ddl_history -- This table contains history of all the DDL changes since capture data enabled. cdc.index_columns -- This table contains indexes associated with change table. cdc.lsn_time_mapping -- This table maps LSN number (for which we will learn later) and time. NOT: BAZEN CDC DATABASE LEVEL DA ETKİN ETMEYE ÇALIŞTIĞIMIZDA HATA VEREBİLİR. Could not update the metadata that indicates database X is enabled for Change Data Capture. The failure occurred when executing the command 'SetCDCTracked(Value = 1)'. The error returned was 15517: 'Cannot execute as the database principal because the principal "dbo" does not exist, this type of principal cannot be impersonated, or you do not have ermission.'. Use the action and error to determine the cause of the failure and resubmit the request.
ÇÖZÜM: ÇALIŞTIĞINIZ DATABASE OWNER sa OLMALIDIR. EĞER sa değilse SA çeviriniz. ******* !!!!!! ANCAK BAZEN DATABASE ÜZERİNE GELİP SAĞ KLİK YAPIP OPTİONSDAN OWNER BAKTIĞINIZDA dbowner SA görülebilir. SİZ GENE DE DBOWNER SA ÇEVİRME PROSEDÜRÜNÜ UYGULAYIN. *********!!!!!!!! EXEC sp_changedbowner 'sa' -- DISABLE FOR DATABASE EXEC sys.sp_cdc_disable_db -- Step 2- Table level -- Önce track edilen tablo var mı kontrol ederiz. USE AdventureWorks GO SELECT [name], is_tracked_by_cdc FROM sys.tables GO
-- DISABLE FOR TABLE EXEC sys.sp_cdc_disable_table @source_schema=N'dbo', -- schema @source_name=N'B', -- table name @capture_instance=N'dbo_B'
3 - REPLIKE ISLEMINI TAKIP ETMEK ICIN TABLO USE [GURMEN] GO /****** Object: Table [dbr].[DBA_CDClog] Script Date: 18.04.2016 10:33:14 ******/ SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO SET ANSI_PADDING ON GO CREATE TABLE [dbr].[DBA_CDClog]( [DBA_CDClogID] [bigint] IDENTITY(1,1) NOT NULL, [SeqValue] [binary](10) NOT NULL, [SchemaName] [nvarchar](50) NOT NULL, [TableName] [nvarchar](100) NOT NULL, [RowGuid] [uniqueidentifier] NOT NULL, [PKeyColumnName] [nvarchar](100) NOT NULL, [PKeyNewValue] [nvarchar](150) NULL, [PKeyOldValue] [nvarchar](150) NULL, [Operation] [char](1) NOT NULL, [Status] [char](1) NOT NULL CONSTRAINT [DF_DBA_CDClog_Status] DEFAULT ('P'), [LastUpdatedUserName] [nvarchar](100) NOT NULL, [CdcLogDate] [datetime] NOT NULL CONSTRAINT [DF_DBA_CDClog_CdcLogDate] DEFAULT (getdate()), CONSTRAINT [PK_DBA_CDClog] PRIMARY KEY CLUSTERED ( [DBA_CDClogID] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90) ON [CDCTBLOGFG], CONSTRAINT [UNQ_SeqValue] UNIQUE NONCLUSTERED ( [SeqValue] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90) ON [CDCTBLOGFG] ) ON [CDCTBLOGFG] GO SET ANSI_PADDING OFF GO
4- REPLIKE ISLEMINI YAPMAK ICIN STORED PROCEDURE USE [GURMEN] GO /****** Object: StoredProcedure [dbr].[dba_Replicator_WITH_MERGE] Script Date: 18.04.2016 10:34:22 ******/ SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO ALTER PROCEDURE [dbr].[dba_Replicator_WITH_MERGE] AS SET NOCOUNT ON; BEGIN --INSERT CDC enabled table [name] to [dbr].[DBA_CaptureInstance] MERGE [dbr].[DBA_CaptureInstance] AS TARGET USING [cdc].[change_tables] AS SOURCE ON (TARGET.TableName=SOURCE.capture_instance) WHEN NOT MATCHED BY TARGET THEN INSERT (TableName) VALUES (SOURCE.capture_instance); DECLARE @TableName nvarchar(50) DECLARE @TableName2 nvarchar(50) DECLARE @rowguid nvarchar(55) DECLARE column_to_row CURSOR FOR -- SELECT CDC tables SELECT RowGuidName,TableName,SUBSTRING(TableName,CHARINDEX('_',TableName)+1,LEN(TableName))AS TableName2 FROM [dbr].[DBA_CaptureInstance] WITH (NOLOCK) OPEN column_to_row FETCH NEXT FROM column_to_row INTO @rowguid,@TableName,@TableName2 WHILE @@FETCH_STATUS = 0 BEGIN ---- FOR COMPOSIT KEY VALUE DECLARE @column nvarchar(1000) = NULL -- For each, FOR LOOP we have to empty variable result of @column, that why we equal it to NULL value DECLARE @mainsql nvarchar(max) = NULL DECLARE @PKeyColumnName nvarchar(100)=NULL DECLARE @MaxSeqValue binary(10)=NULL -- IDENTIFY TO ROW PART SELECT @column =COALESCE(@column + '+''||''+','') +'CAST('+ ic.column_name+' AS VARCHAR (255))' FROM [cdc].[index_columns] ic WITH(NOLOCK) INNER JOIN [cdc].[change_tables] ct ON ic.object_id=ct.object_id WHERE ct.capture_instance=@TableName -- DEFINE PKeyColumnName SELECT @PKeyColumnName = COALESCE(@PKeyColumnName +'+''||''+','') +''''+column_name+'''' FROM [cdc].[index_columns]ic WITH(NOLOCK) INNER JOIN [cdc].[change_tables] ct ON ic.object_id=ct.object_id WHERE ct.capture_instance = @TableName /* DBA_CDClog tablosuna hızlı insert etmek için, DBA_CDCLOG tablosundaki tablolara ait maxseqvalue değerlerini alıp,sistem tablosu olan, CDC tablolarından maxdeğeri,DBA_CDClog seqvalue değerinden büyük olnaları DBA_CDClog tablosu içine inser ediyoruz. */ SELECT @MaxSeqValue = MAX(seqvalue) FROM [dbr].[DBA_CDClog] WITH(NOLOCK) WHERE TableName = @TableName2 -- PREPARE DATA FOR POSTGRESQL INTEGRATION set @mainsql= ' MERGE [dbr].[DBA_CDClog] AS TARGET USING( SELECT -- Starts CDLOG table structure. DISTINCT SeqValue, N.SchemaName, N.TableName, R.[RowGuid], '+@PKeyColumnName+'AS PKeyColumnName, R.PKeyNewValue, R.PKeyOldValue, CASE R.Operation WHEN 1 THEN ''D'' WHEN 2 THEN ''I'' ELSE ''U'' END AS Operation, R.LastUpdatedUserName FROM ( SELECT DISTINCT ta.[__$seqval] AS SeqValue, -- OLD-NEW-AND OPTION fields converted COLUMN TO ROWS with XML format ( SELECT CAST('+@rowguid+' AS nvarchar(55)) FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK) WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) FOR XML PATH('''') )[RowGuid], ( SELECT CASE WHEN ti.[__$operation] IN (2,4) THEN '+@column+' ELSE '''' END FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK) WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) FOR XML PATH('''') ) AS [PKeyNewValue], ( SELECT CASE WHEN ti.[__$operation] IN (1,3) THEN '+@column+' ELSE '''' END FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK) WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) FOR XML PATH('''') ) AS [PKeyOldValue], ( SELECT CAST(ti.[__$operation] AS nvarchar(2)) FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK) WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) FOR XML PATH('''') ) AS [Operation], ( SELECT DISTINCT CAST(ti.[LastUpdatedUserName] AS nvarchar(100)) FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK) WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) FOR XML PATH('''') ) AS [LastUpdatedUserName], ( SELECT ls.[tran_end_time] FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK) INNER JOIN [cdc].[lsn_time_mapping] ls ON ls.[start_lsn]=ti.[__$start_lsn] WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) ) AS [TranEndTime] FROM [cdc].'+@TableName+'_CT ta ) R CROSS JOIN ( SELECT DISTINCT ic.column_name AS PKeyColumnName, SUBSTRING(ct.capture_instance,1,CHARINDEX(''_'',ct.capture_instance)-1) as SchemaName, SUBSTRING(ct.capture_instance,CHARINDEX(''_'',ct.capture_instance)+1,LEN(ct.capture_instance)) AS TableName FROM [cdc].[change_tables] ct WITH(NOLOCK) INNER JOIN [cdc].[index_columns] ic ON ic.object_id=ct.object_id WHERE SUBSTRING(ct.capture_instance,CHARINDEX(''_'',ct.capture_instance)+1,LEN(ct.capture_instance))= '''+@TableName2+''' ) N -- Table For Schema WHERE N.TableName = '''+@TableName2+''' AND R.LastUpdatedUserName NOT IN (''adminREPADMIN'',''admine'',''REPKETM'',''REPRETM'') AND R.[TranEndTime] < DateAdd(MINUTE, -30, getDate()) ) AS SOURCE ON TARGET.SeqValue = SOURCE.SeqValue WHEN NOT MATCHED BY TARGET THEN INSERT ([SeqValue],[SchemaName],[TableName],[RowGuid],[PKeyColumnName],[PKeyNewValue],[PKeyOldValue],[Operation],[LastUpdatedUserName]) VALUES (SOURCE.[SeqValue],SOURCE.[SchemaName],SOURCE.[TableName],SOURCE.[RowGuid],SOURCE.[PKeyColumnName],SOURCE.[PKeyNewValue],SOURCE.[PKeyOldValue],SOURCE.[Operation],SOURCE.[LastUpdatedUserName]); SELECT @@ROWCOUNT ' --print (@mainsql) exec (@mainsql) FETCH NEXT FROM column_to_row INTO @rowguid,@TableName,@TableName2 END CLOSE column_to_row DEALLOCATE column_to_row END