--
-- basic assumption:
-- partition range is FOR LEFT
-- partiitn column is of data type in
-- exec [usp_Util_InsertPartition] 'PartitionTestNo', 100
-- exec [usp_Util_InsertPartition] 'PartitionTest', 2000
-- exec [usp_Util_InsertPartition] 'PartitionTest', 0
-- Basic assumption: (some minor changes are needed to extend the functionality of this sp)
-- 1) partition range is left
-- 2) partion parameter has integer type
-- when @usePreviousFileGroup=1, will use the file gruop on the left most partion value and @fileSize and @fileGrowth will be ignored
-- otherwise will append partion number as the file group name and add a new file to the new file group
CREATE PROCEDURE [usp_Util_InsertPartition]
@tableName varchar(255)
,@partitionValue int
,@schemaName varchar(255)='dbo'
,@usePreviousFileGroup tinyint=1
,@fileSize varchar(255)='50MB'
,@fileGrowth varchar(255)='10'
AS
AS
BEGIN
--
-- Is table partioned at all?
declare @error int, @rowsCount int, @errMsg varchar(1024)
declare @partitionSchema varchar(255), @partitionFunction varchar(255), @partitionColumn varchar(255), @tableStorageType varchar(16), @partitionFunctionID int
declare @partitionFileGroupName varchar(255), @databaseName varchar(255), @partitionNumber int, @cloneTableName varchar(255), @compression tinyint, @compressionQuery varchar(256)
declare @LogHeader varchar(23), @Now datetime
select @tableName=ltrim(rtrim(isnull(@tableName, ''))), @schemaName=ltrim(rtrim(isnull(@schemaName, ''))), @databaseName=db_name(), @schemaName=ltrim(rtrim(@schemaName)), @tableName=ltrim(rtrim(@tableName)), @compressionQuery=''
--
-- find the partition number for the partion value and other information for table partition
SELECT
@tableStorageType=i.type_desc
,@partitionSchema=ps.name
,@partitionFunction=pf.name
,@partitionFunctionID=pf.function_id
,@partitionNumber=rv.boundary_id
,@partitionColumn=c.name
,@partitionFileGroupName=fg.name
-- compression can be 0: no compresson, 1: row level compression 2: page level compression
-- only apply to clustered index?
,@compression=case when i.index_id=1 then p.data_compression else 0 end
FROM sys.schemas s
INNER JOIN sys.tables t
ON t.schema_id=s.schema_id
-- check table storage type
INNER JOIN sys.indexes i
ON (i.object_id = t.object_id
-- 0: heap
-- 1: clusterd
and i.index_id in (0,1))
INNER JOIN sys.partitions p
ON p.object_id = i.object_id
AND p.index_id = i.index_id
INNER JOIN sys.index_columns ic
ON (-- identify partioned column
ic.partition_ordinal > 0
and ic.index_id = i.index_id
and ic.object_id = t.object_id)
INNER JOIN sys.columns c
ON c.object_id = ic.object_id and c.column_id = ic.column_id
INNER JOIN sys.system_internals_allocation_units au
ON p.partition_id = au.container_id
INNER JOIN sys.partition_schemes ps
ON ps.data_space_id = i.data_space_id
INNER JOIN sys.partition_functions pf
ON pf.function_id = ps.function_id
INNER JOIN sys.destination_data_spaces dds
ON dds.partition_scheme_id = ps.data_space_id AND dds.destination_id = p.partition_number
INNER JOIN sys.filegroups fg
ON dds.data_space_id = fg.data_space_id
INNER JOIN sys.partition_range_values rv
ON pf.function_id = rv.function_id AND p.partition_number = rv.boundary_id
WHERE -- only look for heap or clustered index
i.index_id IN (0, 1)
AND s.name=@schemaName
AND t.name=@tableName
-- need exact match, assuming left range type
AND cast(rv.value as int)=@partitionValue
select @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126)
if (@rowsCount<=0)
begin
print @LogHeader+' table '+@tableName+' on db '+@databaseName+' with schema '+@schemaName
+' is not partitioned or @partitionValue='+cast(@partitionValue as varchar(16))
+' is not in exsiting partition range. will return.'
return 100
end else
begin
print @LogHeader+' table '+@tableName+' on db '+@databaseName+' with schema '
+@schemaName+' is partitioned: @partitionSchema='+@partitionSchema
+', @partitionFunction='+@partitionFunction
+', @partitionFunctionID='+cast(@partitionFunctionID as varchar(16))
+', @partitionColumn='+ @partitionColumn
+', @tableStorageType='+@tableStorageType
+', @partitionValue='+cast(@partitionValue as varchar(16))
+', @partitionNumber='+cast(@partitionNumber as varchar(16))
+', @partitionFileGroupName='+@partitionFileGroupName
+', @compression='+cast(@compression as varchar(16))
end
--
-- create clone table on same file group, drop it if already exists
-- sample checking and drop table and PK statement
--IF EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[corDistrictPrd]') AND type in (N'U'))
--DROP TABLE [dbo].[corDistrictPrd]
--IF EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[dbo].[corDistrictPrd]') AND name = N'PK_corDistrictPrd')
--ALTER TABLE [dbo].[corDistrictPrd] DROP CONSTRAINT [PK_corDistrictPrd]
-- No clustered index is not needed
-- taking care of columns and primary key
-- ironically you could drop a table without droping its primary key!! suprise suprise
-- Now we know the primary key name, we can generate the drop statement
declare @dropCloneTable varchar(8000), @createCloneTable varchar(8000), @primaryKeyName varchar(255), @dropPrimaryKey varchar(8000), @primayKeyNamePlaceHolder varchar(255), @hasPrimaryKey tinyint, @switchPartition varchar(1024)
select
-- A place holder for primary key which will be replace when the value is known
@primayKeyNamePlaceHolder='$$PK_NAME$$'
,@cloneTableName=@tableName+'_'+cast(@partitionNumber as varchar(16))
,@dropCloneTable='IF EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'+''''+@schemaName+'.'+@cloneTableName+''''+') AND type in (N'+''''+'U'+''''+'))'+char(13)
+'BEGIN'+char(13)+'DROP TABLE '+@schemaName+'.'+@cloneTableName+char(13)+'END'
,@dropPrimaryKey='IF EXISTS (SELECT 1 FROM sys.indexes WHERE object_id = OBJECT_ID(N'+''''+@schemaName+'.'+@cloneTableName+''''+') AND name = N'+''''+@primayKeyNamePlaceHolder+''''+')'+char(13)
+'BEGIN'+char(13)+'ALTER TABLE '+@schemaName+'.'+@cloneTableName+' DROP CONSTRAINT '+@primayKeyNamePlaceHolder+char(13)+'END'
,@switchPartition='ALTER TABLE '+@schemaName+'.'+@tableName +' SWITCH PARTITION '+cast(@partitionNumber as varchar(16))+' TO '+@schemaName+'.'+@cloneTableName
select
@hasPrimaryKey=CASE WHEN tc.Constraint_Name IS NULL THEN 0 ELSE 1 END
,@primaryKeyName=CASE WHEN @hasPrimaryKey=1 THEN replace(replace(@CloneTableName, @schemaName, ''), '.', '')+'_'+tc.Constraint_Name ELSE '' END
,@createCloneTable=
'CREATE TABLE ' +@CloneTableName
+ '(' + o.list + ')'
+ ' ON '+@partitionFileGroupName
+ CASE WHEN @hasPrimaryKey=0 THEN '' ELSE char(13)
+'ALTER TABLE ' + @CloneTableName + ' ADD CONSTRAINT ' + @primaryKeyName + ' PRIMARY KEY '
+ CASE WHEN i.index_id IS NULL THEN 'NONCLUSTERED ' ELSE 'CLUSTERED ' END
+ ' (' + LEFT(j.List, Len(j.List)-1) + ')' END
from sys.schemas s inner join
sys.objects so on
s.schema_id=so.schema_id
cross apply
(SELECT
' ['+column_name+'] '
+ data_type
+ case data_type
when 'sql_variant' then ''
when 'text' then ''
when 'decimal' then '(' + cast(numeric_precision as varchar) + ', ' + cast(numeric_scale as varchar) + ')'
else coalesce('('+case when character_maximum_length = -1 then 'MAX' else cast(character_maximum_length as varchar) end +')','') end
+ ' '
+ case when exists (
select id from syscolumns
where object_name(id)=so.name
and name=column_name
and columnproperty(id,name,'IsIdentity') = 1
)
then 'IDENTITY(' + cast(ident_seed(so.name) as varchar) + ',' + cast(ident_incr(so.name) as varchar) + ')'
else '' end
+ ' '
+ (case when IS_NULLABLE = 'No' then 'NOT ' else '' end ) + 'NULL '
+ case when information_schema.columns.COLUMN_DEFAULT IS NOT NULL THEN 'DEFAULT '+ information_schema.columns.COLUMN_DEFAULT ELSE '' END + ', '
from information_schema.columns
where table_name = so.name
order by ordinal_position
FOR XML PATH('')) o (list)
left join information_schema.table_constraints tc
on tc.Table_name = so.Name
AND tc.Constraint_Type = 'PRIMARY KEY'
cross apply
(select '[' + Column_Name + '], '
FROM information_schema.key_column_usage kcu
WHERE kcu.Constraint_Name = tc.Constraint_Name
ORDER BY ORDINAL_POSITION
FOR XML PATH('')) j (list)
-- find out if the primary key is clustered
left join sys.indexes i
on so.object_id=i.object_id
-- index_id 1 means clustered
-- constraint name is primary key name which is same as the index name
AND i.index_id=1 AND tc.CONSTRAINT_NAME=i.name
where s.name=@SchemaName
and so.type = 'U'
and so.name NOT IN ('dtproperties')
and so.name=@TableName
select @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126)
print @LogHeader+' @hasPrimaryKey='+cast(@hasPrimaryKey as varchar(16))
if (@hasPrimaryKey=1)
begin
select @dropPrimaryKey=replace(@dropPrimaryKey, @primayKeyNamePlaceHolder, @primaryKeyName)
end
-- 1: Row compression
-- 2: Page compression
if @compression=1
begin
set @compressionQuery='ALTER TABLE ' +@schemaName+'.'+@cloneTableName + ' REBUILD PARTITION = ALL WITH (DATA_COMPRESSION = ROW)';
end else if @compression=2
begin
set @compressionQuery='ALTER TABLE ' +@schemaName+'.'+@cloneTableName + ' REBUILD PARTITION = ALL WITH (DATA_COMPRESSION = PAGE)';
end
--
-- taking care of clusterd index
-- taking care of primary key is not enough since a primary key may not be clustered
-- so now we have to excludes clustered primary key index since they are already been taken care of
-- sample index create and drop statement
--IF EXISTS (SELECT * FROM sys.indexes WHERE object_id = OBJECT_ID(N'[dbo].[fctBranch]') AND name = N'IX_fctBranch')
--DROP INDEX [IX_fctBranch] ON [dbo].[fctBranch] WITH ( ONLINE = OFF )
-- CREATE [ UNIQUE | CLUSTERED | NONCLUSTERED ] INDEX [IX_fctBranch] ON [dbo].[fctBranch]
--(
-- [dimPMFirmid] ASC,
-- [dimPeriodid] ASC
--)
--
-- we may not need to drop the index? just in case
declare @indexColums varchar(256), @indexName varchar(256), @indexUnique varchar(8), @hasNonPrimaryKeyClusterdIndex tinyint, @createClusteredIndex varchar(8000), @dropClusteredIndex varchar(8000)
select @indexColums='(', @indexUnique='', @indexName='', @hasNonPrimaryKeyClusterdIndex=0, @dropClusteredIndex='', @createClusteredIndex=''
/*
select
-- whenever we have result from here, we do hve clusterd index
@hasNonPrimaryKeyClusterdIndex=1
-- this will accumulate all columns
,@indexColums=@indexColums+c.name+case when ic.is_descending_key=0 then ' ASC' else ' DESC' end+','
-- they will be overwritten since they have identical values for all columns
,@indexName=@cloneTableName+'_'+i.name
,@indexUnique=case when i.is_unique=0 then '' else 'UNIQUE ' end
FROM sys.schemas s
INNER JOIN sys.tables t
ON t.schema_id=s.schema_id
INNER JOIN sys.indexes i
ON (i.object_id = t.object_id
-- looking for clustered index ONLY
and i.index_id=1
-- ignore primary key, which is taken care of above already
and i.is_primary_key=0)
INNER JOIN sys.index_columns ic
ON (ic.index_id = i.index_id and ic.object_id = t.object_id)
INNER JOIN sys.columns c
ON c.column_id=ic.column_id and c.object_id=ic.object_id
where s.name=@schemaName and t.name=@tableName
order by ic.key_ordinal
*/
-- this is a better way to do the same thing:
select
-- whenever we have result from here, we do hve clusterd index
@hasNonPrimaryKeyClusterdIndex=1
,@indexColums=@indexColums+ic.columnList
,@indexName=@cloneTableName+'_'+i.name
,@indexUnique=case when i.is_unique=0 then '' else 'UNIQUE ' end
from sys.schemas s
INNER JOIN sys.tables t
ON t.schema_id=s.schema_id
INNER JOIN sys.indexes i
ON (i.object_id = t.object_id
-- looking for clustered index ONLY
and i.index_id=1
-- ignore primary key, which is taken care of above already
and i.is_primary_key=0)
CROSS APPLY
(
select
c.name+case when ic.is_descending_key=0 then ' ASC' else ' DESC' end+','
from sys.index_columns ic
INNER JOIN sys.columns c
ON c.column_id=ic.column_id and c.object_id=ic.object_id
where ic.index_id = i.index_id and ic.object_id = t.object_id
order by ic.key_ordinal
FOR XML PATH('')
) ic(columnList)
where s.name=@schemaName and t.name=@tableName
select @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @hasNonPrimaryKeyClusterdIndex=isnull(@hasNonPrimaryKeyClusterdIndex, 0)
print @LogHeader+' @hasNonPrimaryKeyClusterdIndex='+cast(@hasNonPrimaryKeyClusterdIndex as varchar(16))
if (@hasNonPrimaryKeyClusterdIndex>=1)
begin
select @dropClusteredIndex='IF EXISTS (SELECT 1 FROM sys.indexes WHERE object_id = OBJECT_ID(N'+''''+@schemaName+'.'+@cloneTableName+''''+') AND name = N'+''''+@indexName+''''+')'
+char(13)+'BEGIN'+char(13)
+'DROP INDEX '+@indexName+' ON '+@schemaName+'.'+@cloneTableName
+char(13)+'END'
,@createClusteredIndex='CREATE '+@indexUnique+'CLUSTERED INDEX '+@indexName+' ON '+@schemaName+'.'+@cloneTableName+left(@indexColums, len(@indexColums)-1)+')'
end
select @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126)
print @LogHeader+' (@dropPrimaryKey, @dropClusteredIndex, @dropCloneTable, @createCloneTable, @createClusteredIndex, @compressionQuery, @switchPartition, @dropPrimaryKey, @dropCloneTable)'
print @dropPrimaryKey+char(13)+@dropClusteredIndex+char(13)+@dropCloneTable+char(13)+@createCloneTable+char(13)+@createClusteredIndex+char(13)+@compressionQuery+char(13)+@switchPartition+char(13)+@dropPrimaryKey+char(13)+@dropCloneTable
declare @returnCode int
-- Note: we can not have transaction here, unfortunately.
-- Msg 226, Level 16, State 6, Line 1
-- ALTER DATABASE statement not allowed within multi-statement transaction.
--begin transaction Txc_usp_Util_TruncatePartition
if (@hasPrimaryKey=1)
begin
EXEC(@dropPrimaryKey)
select @returnCode=300, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' [1] failed to drop primary key for clone table. will return.'
if (@error<>0) goto ERROR
end
if (@hasNonPrimaryKeyClusterdIndex=1)
begin
EXEC(@dropClusteredIndex)
select @returnCode=310, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' [1] failed to drop clustered index for clone table. will return.'
if (@error<>0) goto ERROR
end
EXEC(@dropCloneTable)
select @returnCode=320, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' [1] failed to drop clone table. will return.'
if (@error<>0) goto ERROR
EXEC(@createCloneTable)
select @returnCode=330, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' failed to create clone table. will return.'
if (@error<>0) goto ERROR
if (@hasNonPrimaryKeyClusterdIndex=1)
begin
EXEC(@createClusteredIndex)
select @returnCode=340, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' failed to create clustered index for clone table. will return.'
if (@error<>0) goto ERROR
end
if (@compression IN (1,2))
begin
EXEC(@compressionQuery)
select @returnCode=350, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' failed to adjust clone table compressoin option. will return.'
if (@error<>0) goto ERROR
end
--
-- now do the partition switch to truncate data
EXEC(@switchPartition)
select @returnCode=360, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' failed to switch partition to clone table. will return.'
if (@error<>0) goto ERROR
--
-- now drop the coloum table again
if (@hasPrimaryKey=1)
begin
EXEC(@dropPrimaryKey)
select @returnCode=370, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' [2] failed to drop primary key for clone table. will return.'
if (@error<>0) goto ERROR
end
--
-- may not need to do this:
-- also this may have some performance issue since dropping clustered index means a heap will have to be created??
--if (@hasNonPrimaryKeyClusterdIndex=1)
--begin
--EXEC(@dropClusteredIndex)
--select @returnCode=380, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' [2] failed to drop clustered index for clone table. will return.'
--if (@error<>0) goto ERROR
--end
EXEC(@dropCloneTable)
select @returnCode=390, @error=@@ERROR, @rowsCount=@@ROWCOUNT, @Now=getdate(), @LogHeader=convert(char(23), @Now, 126), @errMsg=@LogHeader+' [2] failed to drop clone table. will return.'
if (@error<>0) goto ERROR
--commit transaction Txc_usp_Util_TruncatePartition
RETURN 0
ERROR:
--rollback transaction
print @errMsg
return @returnCode
END
No comments:
Post a Comment