我有一个表中的记录审计。 1列或多列的变化有多个列,并且每个记录的状态。
我需要返回的审计结果,其中复原模式将是: column
(ID, 别名或名称 ),以前的价值,新价值等。
问题是,有可能时,每次使用新的记录更改后的数据多列。 同时,审计列的数量是5所以这是可能的“硬编码”有名字和改变核查。
因此,它是可以缩短的方式来编写这样的查询不只是使用工会做出了SELECT查询每一列和检查的变化?
比方说,有一个与列的表:
id, datetime value, int value, varchar value.
如果我有2个记录这样的数据变化,如:
id1, value1, value1, value1
id1, value2, value1, value2
然后,我希望这样的审计结果:
id1, value1 as oldvalue, value2 as newvalue, column2name as columnname
id1, value1 as oldvalue, value2 as newvalue, column4name as columnname
如果我没有错过任何东西:
WITH ranked AS (
SELECT
ChangeDate,
ColPK,
Col1,
Col2,
Col3,
Col4,
Col5,
OverallRank = ROW_NUMBER() OVER (PARTITION BY ColPK ORDER BY ChangeDate),
Col1Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col1 ORDER BY ChangeDate),
Col2Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col2 ORDER BY ChangeDate),
Col3Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col3 ORDER BY ChangeDate),
Col4Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col4 ORDER BY ChangeDate),
Col5Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col5 ORDER BY ChangeDate)
FROM AuditTable
)
, ranked2 AS (
SELECT
ChangeDate,
ColPK,
Col1,
Col2,
Col3,
Col4,
Col5,
Col1Group = RANK() OVER (PARTITION BY ColPK, Col1 ORDER BY OverallRank - Col1Rank),
Col2Group = RANK() OVER (PARTITION BY ColPK, Col2 ORDER BY OverallRank - Col2Rank),
Col3Group = RANK() OVER (PARTITION BY ColPK, Col3 ORDER BY OverallRank - Col3Rank),
Col4Group = RANK() OVER (PARTITION BY ColPK, Col4 ORDER BY OverallRank - Col4Rank),
Col5Group = RANK() OVER (PARTITION BY ColPK, Col5 ORDER BY OverallRank - Col5Rank),
Col1Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col1, OverallRank - Col1Rank ORDER BY ChangeDate),
Col2Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col2, OverallRank - Col2Rank ORDER BY ChangeDate),
Col3Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col3, OverallRank - Col3Rank ORDER BY ChangeDate),
Col4Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col4, OverallRank - Col4Rank ORDER BY ChangeDate),
Col5Rank = ROW_NUMBER() OVER (PARTITION BY ColPK, Col5, OverallRank - Col5Rank ORDER BY ChangeDate)
FROM ranked
),
unpivoted AS (
SELECT
r.ChangeTime,
r.ColPK,
x.ColName,
ColRank = CASE x.Colname
WHEN 'Col1' THEN Col1Group
WHEN 'Col2' THEN Col2Group
WHEN 'Col3' THEN Col3Group
WHEN 'Col4' THEN Col4Group
WHEN 'Col5' THEN Col5Group
END,
Value = CASE x.Colname
WHEN 'Col1' THEN CONVERT(nvarchar(100), r.Col1)
WHEN 'Col2' THEN CONVERT(nvarchar(100), r.Col2)
WHEN 'Col3' THEN CONVERT(nvarchar(100), r.Col3)
WHEN 'Col4' THEN CONVERT(nvarchar(100), r.Col4)
WHEN 'Col5' THEN CONVERT(nvarchar(100), r.Col5)
END
FROM ranked2 r
INNER JOIN (VALUES ('Col1'), ('Col2'), ('Col3'), ('Col4'), ('Col5')) x (ColName)
ON x.ColName = 'Col1' AND Col1Rank = 1
OR x.ColName = 'Col2' AND Col2Rank = 1
OR x.ColName = 'Col3' AND Col3Rank = 1
OR x.ColName = 'Col4' AND Col4Rank = 1
OR x.ColName = 'Col5' AND Col5Rank = 1
)
SELECT
new.ChangeTime,
new.ColPK,
new.ColName,
old.Value AS OldValue,
new.Value AS NewValue
FROM unpivoted new
LEFT JOIN unpivoted old
ON new.ColPK = old.ColPK
AND new.ColName = old.ColName
AND new.ColRank = old.ColRank + 1
基本上,这个想法是排名相同值的连续的组并选择每一个值的第一出现。 这对于其值正在审核每一列完成,而列在这个过程中unpivot操作。 此后,逆转置行集连接到本身,即每一个PK和列名,每行相匹配,它的前身(基于排名)同一行的最终结果集,以获取旧值。
这里是产生相同的期望结果的简单查询,并且是很容易修改,以适应不同的列数或改变的列名的,因为唯一的区别是PK列(多个)+%的非PK-单行在列CROSS APPLY
。 我不得不添加一个ChangeDate
列-没有它,就没有办法知道插入到审计表中的行的顺序。
WITH ColValues AS (
SELECT
Grp = Row_Number() OVER (
PARTITION BY H.OrderID, U.ColName ORDER BY H.ChangeDate ASC, X.Which
) / 2,
H.OrderID,
H.ChangeDate,
U.*,
X.Which
FROM
dbo.OrderHistory H
CROSS APPLY (VALUES
('DeliveryDate', Convert(varchar(1000), DeliveryDate, 121)),
('Quantity', Convert(varchar(1000), Quantity)),
('SpecialNotes', Convert(varchar(1000), SpecialNotes))
) U (ColName, Value)
CROSS JOIN (VALUES (1), (2)) X (Which)
)
SELECT
V.OrderID,
V.ColName,
DateChanged = Max(V.ChangeDate),
OldValue = Max(F.Value),
NewValue = Max(T.Value)
FROM
ColValues V
OUTER APPLY (SELECT V.ColName, V.Value WHERE V.Which = 2) F
OUTER APPLY (SELECT V.ColName, V.Value WHERE V.Which = 1) T
GROUP BY
V.OrderID,
V.ColName,
V.Grp
HAVING
Count(*) = 2
AND EXISTS (
SELECT Max(F.Value)
EXCEPT SELECT Max(T.Value)
)
;
看到这个查询在SQL小提琴的现场演示 。
2012年SQL,这将是解决了一个更好的LEAD
或LAG
的分析功能。 在CROSS JOIN
和Row_Number
我的查询通过复制每一行和分配成对这些重复的行成自己的组(其中每个组都有较邻近的审计历史记录行两行)模拟这一点。 然后,通过战略性地使用聚集的,我们可以处理分组对来选择和比较它们的值。
另外,我原来写与查询UNPIVOT
,但很可惜,它不保留空值-严重疏忽微软,在我看来。 这本来是方便开发者添加如果需要删除空值的条件,但它的方式是UNPIVOT
可以希望保留空值时,根本无法使用。 讽刺的是,所得到的代码更紧凑,和2线变短,使用CROSS APPLY
到UNPIVOT -现在的转化率和unpivoting发生在一个步骤,而不是2。
我的样本数据是:
ChangeDate OrderID DeliveryDate Quantity SpecialNotes
----------------------- ------- ----------------------- -------- ----------------------------------------------------
2013-03-01 11:28:00.000 1 2013-04-01 00:00:00.000 25 NULL
2013-03-01 11:56:00.000 1 2013-04-01 00:00:00.000 30 NULL
2013-03-05 10:18:00.000 1 2013-04-02 00:00:00.000 30 Customer called to ask for delivery date adjustment.
2013-03-01 11:37:00.000 2 2013-03-05 00:00:00.000 17 NULL
所得到的行集:
OrderID ColName DateChanged OldValue NewValue
------- ------------ ----------------------- ----------------------- ---------------------------------------------------
1 DeliveryDate 2013-03-05 10:18:00.000 2013-04-01 00:00:00.000 2013-04-02 00:00:00.000
1 Quantity 2013-03-01 11:56:00.000 25 30
1 SpecialNotes 2013-03-05 10:18:00.000 NULL Customer called to ask for delivery date adjustment.
注:因为我的查询只有一个排序函数,没有JOIN
S,这即使在非常大的表上执行得非常好-好几个数量级,或许比使用一个解决方案JOIN
那里没有配套的指标。 这将是最好的审计表有一个聚集索引PK, ChangeDate
。