DEFINITIONS

Definitions More Info.
Definition ID716
TitleSQL
CategoryNOTES
DefinitionCDC - Komple CDC script with Merge ve replike kurma
Definition Descriptionhttps://www.youtube.com/watch?v=IJbAhfFRHdA
-- Detaylı olan https://www.youtube.com/watch?v=wSQ1GXEwAUQ
-- Kısa ve özet https://www.simple-talk.com/sql/learn-sql-server/introduction-to-change-data-capture-(cdc)-in- CDC is two level
1- Önce, üzerinde CDC olan database var mı onu kontrol edelim
USE master
GO
SELECT [name], database_id, is_cdc_enabled FROM sys.databases GO
2- CDC 2 step dir. DB level ve table level

-- Step
1- Database level
-- ENABLE FOR DATABASE
USE TEST
GO
EXEC sys.sp_cdc_enable_db
-- System Tables altında bazı tablolar oluşturulur cdc.captured_columns
-- This table returns result for list of captured column. cdc.change_tables
-- This table returns list of all the tables which are enabled for capture. cdc.ddl_history
-- This table contains history of all the DDL changes since capture data enabled. cdc.index_columns
-- This table contains indexes associated with change table. cdc.lsn_time_mapping
-- This table maps LSN number (for which we will learn later) and time.
NOT: BAZEN CDC DATABASE LEVEL DA ETKİN ETMEYE ÇALIŞTIĞIMIZDA HATA VEREBİLİR. Could not update the metadata that indicates database X is enabled for Change Data Capture. The failure occurred when executing the command 'SetCDCTracked(Value = 1)'.
The error returned was 15517: 'Cannot execute as the database principal because the principal "dbo" does not exist, this type of principal cannot be impersonated, or you do not have ermission.'.
Use the action and error to determine the cause of the failure and resubmit the request.

ÇÖZÜM: ÇALIŞTIĞINIZ DATABASE OWNER sa OLMALIDIR. EĞER sa değilse SA çeviriniz. ******* !!!!!!
ANCAK BAZEN DATABASE ÜZERİNE GELİP SAĞ KLİK YAPIP OPTİONSDAN OWNER BAKTIĞINIZDA dbowner SA görülebilir.
SİZ GENE DE DBOWNER SA ÇEVİRME PROSEDÜRÜNÜ UYGULAYIN. *********!!!!!!!!
EXEC sp_changedbowner 'sa'
-- DISABLE FOR DATABASE EXEC
sys.sp_cdc_disable_db
-- Step
2- Table level
-- Önce track edilen tablo var mı kontrol ederiz.
USE AdventureWorks
GO
SELECT [name], is_tracked_by_cdc FROM sys.tables GO

-- ENABLE FOR TABLE
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo' ,
@source_name = N'A' ,
@role_name = NULL -- ,
@capture_instance = N'dbo_A' --,
@capture_instance = NULL --,
@supports_net_changes = 1 --,
@captured_column_list = N'AddressID, AddressLine1, City' -- Boş bırakılırsa hepsini alır --,
@filegroup_name = N'PRIMARY';
GO

-- DISABLE FOR TABLE
EXEC sys.sp_cdc_disable_table @source_schema=N'dbo',
-- schema @source_name=N'B',
-- table name @capture_instance=N'dbo_B'

3 - REPLIKE ISLEMINI TAKIP ETMEK ICIN TABLO
USE [GURMEN]
GO
/****** Object: Table [dbr].[DBA_CDClog] Script Date: 18.04.2016 10:33:14 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET ANSI_PADDING ON
GO
CREATE TABLE [dbr].[DBA_CDClog]( [DBA_CDClogID] [bigint] IDENTITY(1,1) NOT NULL,
[SeqValue] [binary](10) NOT NULL,
[SchemaName] [nvarchar](50) NOT NULL,
[TableName] [nvarchar](100) NOT NULL,
[RowGuid] [uniqueidentifier] NOT NULL,
[PKeyColumnName] [nvarchar](100) NOT NULL,
[PKeyNewValue] [nvarchar](150) NULL,
[PKeyOldValue] [nvarchar](150) NULL,
[Operation] [char](1) NOT NULL,
[Status] [char](1) NOT NULL CONSTRAINT [DF_DBA_CDClog_Status] DEFAULT ('P'),
[LastUpdatedUserName] [nvarchar](100) NOT NULL,
[CdcLogDate] [datetime] NOT NULL CONSTRAINT [DF_DBA_CDClog_CdcLogDate] DEFAULT (getdate()),
CONSTRAINT [PK_DBA_CDClog]
PRIMARY KEY CLUSTERED ( [DBA_CDClogID] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90) ON [CDCTBLOGFG],
CONSTRAINT [UNQ_SeqValue] UNIQUE NONCLUSTERED ( [SeqValue] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 90) ON [CDCTBLOGFG] ) ON [CDCTBLOGFG]
GO SET ANSI_PADDING OFF GO

4- REPLIKE ISLEMINI YAPMAK ICIN STORED PROCEDURE
USE [GURMEN]
GO
/****** Object: StoredProcedure [dbr].[dba_Replicator_WITH_MERGE] Script Date: 18.04.2016 10:34:22 ******/
SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON
GO
ALTER PROCEDURE [dbr].[dba_Replicator_WITH_MERGE] AS SET NOCOUNT ON;
BEGIN
--INSERT CDC enabled table [name] to [dbr].[DBA_CaptureInstance]
MERGE [dbr].[DBA_CaptureInstance] AS TARGET
USING [cdc].[change_tables] AS SOURCE ON (TARGET.TableName=SOURCE.capture_instance)
WHEN NOT MATCHED BY TARGET THEN INSERT (TableName) VALUES (SOURCE.capture_instance);
DECLARE @TableName nvarchar(50)
DECLARE @TableName2 nvarchar(50)
DECLARE @rowguid nvarchar(55)
DECLARE column_to_row CURSOR FOR
-- SELECT CDC tables
SELECT RowGuidName,TableName,SUBSTRING(TableName,CHARINDEX('_',TableName)+1,LEN(TableName))AS TableName2
FROM [dbr].[DBA_CaptureInstance] WITH (NOLOCK)
OPEN column_to_row FETCH
NEXT FROM column_to_row INTO @rowguid,@TableName,@TableName2
WHILE @@FETCH_STATUS = 0
BEGIN
---- FOR COMPOSIT KEY VALUE
DECLARE @column nvarchar(1000) = NULL
-- For each, FOR LOOP we have to empty variable result of @column, that why we equal it to NULL value
DECLARE @mainsql nvarchar(max) = NULL
DECLARE @PKeyColumnName nvarchar(100)=NULL
DECLARE @MaxSeqValue binary(10)=NULL
-- IDENTIFY TO ROW PART
SELECT @column =COALESCE(@column + '+''||''+','') +'CAST('+ ic.column_name+' AS VARCHAR (255))'
FROM [cdc].[index_columns] ic
WITH(NOLOCK)
INNER JOIN [cdc].[change_tables] ct ON ic.object_id=ct.object_id
WHERE ct.capture_instance=@TableName
-- DEFINE PKeyColumnName
SELECT @PKeyColumnName = COALESCE(@PKeyColumnName +'+''||''+','') +''''+column_name+''''
FROM [cdc].[index_columns]ic WITH(NOLOCK)
INNER JOIN [cdc].[change_tables] ct ON ic.object_id=ct.object_id
WHERE ct.capture_instance = @TableName
/* DBA_CDClog tablosuna hızlı insert etmek için, DBA_CDCLOG tablosundaki tablolara ait maxseqvalue değerlerini alıp,sistem tablosu olan,
CDC tablolarından maxdeğeri,DBA_CDClog seqvalue değerinden büyük olnaları DBA_CDClog tablosu içine inser ediyoruz. */
SELECT @MaxSeqValue = MAX(seqvalue)
FROM [dbr].[DBA_CDClog] WITH(NOLOCK)
WHERE TableName = @TableName2
-- PREPARE DATA FOR POSTGRESQL INTEGRATION
set @mainsql= ' MERGE [dbr].[DBA_CDClog] AS TARGET
USING( SELECT
-- Starts CDLOG table structure.
DISTINCT SeqValue,
N.SchemaName,
N.TableName,
R.[RowGuid], '+@PKeyColumnName+'AS PKeyColumnName,
R.PKeyNewValue,
R.PKeyOldValue,
CASE R.Operation WHEN 1 THEN ''D'' WHEN 2 THEN ''I'' ELSE ''U'' END AS Operation,
R.LastUpdatedUserName
FROM ( SELECT DISTINCT ta.[__$seqval] AS SeqValue,
-- OLD-NEW-AND OPTION fields converted COLUMN TO ROWS with XML format
( SELECT CAST('+@rowguid+' AS nvarchar(55))
FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK)
WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4)
FOR XML PATH('''') )[RowGuid], ( SELECT CASE WHEN ti.[__$operation] IN (2,4) THEN '+@column+' ELSE '''' END
FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK)
WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4) FOR XML PATH('''') ) AS [PKeyNewValue],
( SELECT CASE WHEN ti.[__$operation] IN (1,3) THEN '+@column+' ELSE '''' END
FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK)
WHERE ta.[__$seqval] = ti.[__$seqval]
AND ti.[__$operation] IN (1,2,4)
FOR XML PATH('''') ) AS [PKeyOldValue],
( SELECT CAST(ti.[__$operation] AS nvarchar(2))
FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK)
WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4)
FOR XML PATH('''') ) AS [Operation],
( SELECT DISTINCT CAST(ti.[LastUpdatedUserName] AS nvarchar(100))
FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK)
WHERE ta.[__$seqval] = ti.[__$seqval] AND ti.[__$operation] IN (1,2,4)
FOR XML PATH('''') ) AS [LastUpdatedUserName],
( SELECT ls.[tran_end_time] FROM [cdc].'+@TableName+'_CT ti WITH(NOLOCK)
INNER JOIN [cdc].[lsn_time_mapping] ls ON ls.[start_lsn]=ti.[__$start_lsn]
WHERE ta.[__$seqval] = ti.[__$seqval]
AND ti.[__$operation] IN (1,2,4) ) AS [TranEndTime]
FROM [cdc].'+@TableName+'_CT ta ) R
CROSS JOIN ( SELECT DISTINCT ic.column_name AS PKeyColumnName,
SUBSTRING(ct.capture_instance,1,CHARINDEX(''_'',ct.capture_instance)-1) as SchemaName,
SUBSTRING(ct.capture_instance,CHARINDEX(''_'',ct.capture_instance)+1,LEN(ct.capture_instance)) AS TableName
FROM [cdc].[change_tables] ct WITH(NOLOCK)
INNER JOIN [cdc].[index_columns] ic ON ic.object_id=ct.object_id
WHERE SUBSTRING(ct.capture_instance,CHARINDEX(''_'',ct.capture_instance)+1,LEN(ct.capture_instance))= '''+@TableName2+''' ) N
-- Table For Schema
WHERE N.TableName = '''+@TableName2+'''
AND R.LastUpdatedUserName NOT IN (''adminREPADMIN'',''admine'',''REPKETM'',''REPRETM'')
AND R.[TranEndTime] < DateAdd(MINUTE, -30, getDate()) ) AS SOURCE ON TARGET.SeqValue = SOURCE.SeqValue
WHEN NOT MATCHED BY TARGET THEN
INSERT ([SeqValue],[SchemaName],[TableName],[RowGuid],[PKeyColumnName],[PKeyNewValue],[PKeyOldValue],[Operation],[LastUpdatedUserName])
VALUES (SOURCE.[SeqValue],SOURCE.[SchemaName],SOURCE.[TableName],SOURCE.[RowGuid],SOURCE.[PKeyColumnName],SOURCE.[PKeyNewValue],SOURCE.[PKeyOldValue],SOURCE.[Operation],SOURCE.[LastUpdatedUserName]);
SELECT @@ROWCOUNT '
--print (@mainsql)
exec (@mainsql) FETCH NEXT FROM column_to_row INTO @rowguid,@TableName,@TableName2 END CLOSE column_to_row DEALLOCATE column_to_row END
RecordBycunay
Record Date02-03-2016 11:03:19
Düzenle
Kopyala
Sil