How to handle data skew in the spark data frame fo

2020-07-18 05:17发布

问题:

I have two data frames and I am performing outer join on 5 columns . Below is example of my data set .

uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|FinancialAsReportedLineItemName.languageId|^|FinancialStatementLineItemValue|^|AdjustedForCorporateActionValue|^|ReportedCurrencyId|^|IsAsReportedCurrencySetManually|^|Unit|^|IsTotal|^|StatementSectionCode|^|DimentionalLineItemId|^|IsDerived|^|EstimateMethodCode|^|EstimateMethodNote|^|EstimateMethodNote.languageId|^|FinancialLineItemSource|^|IsCombinedItem|^|IsExcludedFromStandardization|^|DocByteOffset|^|DocByteLength|^|BookMark|^|ItemDisplayedNegativeFlag|^|ItemScalingFactor|^|ItemDisplayedValue|^|ReportedValue|^|EditedDescription|^|EditedDescription.languageId|^|ReportedDescription|^|ReportedDescription.languageId|^|AsReportedInstanceSequence|^|PhysicalMeasureId|^|FinancialStatementLineItemSequence|^|SystemDerivedTypeCode|^|AsReportedExchangeRate|^|AsReportedExchangeRateSourceCurrencyId|^|ThirdPartySourceCode|^|FinancialStatementLineItemValueUpperRange|^|FinancialStatementLineItemLocalLanguageLabel|^|FinancialStatementLineItemLocalLanguageLabel.languageId|^|IsFinal|^|FinancialStatementLineItem.lineItemInstanceKey|^|StatementSectionIsCredit|^|CapitalChangeAdjustmentDate|^|ParentLineItemId|^|EstimateMethodId|^|StatementSectionId|^|SystemDerivedTypeCodeId|^|UnitEnumerationId|^|FiscalYear|^|IsAnnual|^|PeriodPermId|^|PeriodPermId.objectTypeId|^|PeriodPermId.objectType|^|AuditID|^|AsReportedItemId|^|ExpressionInstanceId|^|ExpressionText|^|FFAction|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|221|^|Average Age of Employees|^|505074|^|30.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235002211206722736|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|498|^|Shareholders' Equity Per Share|^|505074|^|91.37000|^|678.74654|^|500186|^|False|^|1.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|474880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235004981302988315|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|500|^|Number of Shares Outstanding at Period End-Common Shares|^|505074|^|90000000.00000|^|12115420.96161|^||^|False|^|1000.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|3|^||^||^||^|505074|^||^|505074|^||^||^|499712|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235005001178855709|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010067|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|562|^|Number of Employees|^|505074|^|2924.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|464864|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235005621461877526|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|655|^|Total number of shareholders|^|505074|^|11792.00000|^||^||^|False|^|1.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|466927|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235006551335570418|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|657|^|Total dividends paid (common stock)|^|505074|^|540000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|233463|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|12350065712483219|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1452|^|Order received|^|505074|^|26936000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014521608462544|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1453|^|Order backlogs|^|505074|^|1447000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014531922884465|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1457|^|Export amount|^|505074|^|3924000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|291829|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014571728332413|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239205|^|235|^|1|^|FTN|^|500186|^|1459|^|Capital expenditures (Note)|^|505074|^|659000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|350195|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1235014591148256870|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|255|^|Number of Employees|^|505074|^|10152.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|12288|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236002551128894330|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|256|^|Average Age of Employees|^|505074|^|34.00000|^||^||^|False|^|1.00000|^|False|^|EMP|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|122880|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236002561111316467|^|True|^||^||^|3019656|^|3013652|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|542|^|Shareholders' Equity Per Share|^|505074|^|160.20000|^|691.93184|^|500186|^|False|^|1.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|471038|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236005421170597389|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|545|^|Number of Shares Outstanding at Period End-Common Shares|^|505074|^|679468000.00000|^|157314300.64243|^||^|False|^|1000.00000|^|False|^|TAN|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|3|^||^||^||^|505074|^||^|505074|^||^||^|472064|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236005451445165969|^|True|^||^||^|3019656|^|3013751|^|3019679|^|1010067|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|718|^|Total dividends paid (common stock)|^|505074|^|4750000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|458752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236007181118043352|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|1364|^|Export amount|^|505074|^|15379000000.00000|^||^|500186|^|False|^|1000000.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|6|^||^||^||^|505074|^||^|505074|^||^||^|459752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236013641649895533|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010068|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|
192730239285|^|236|^|1|^|FTN|^|500186|^|1407|^|Total number of shareholders|^|505074|^|57288.00000|^||^||^|False|^|1.00000|^|False|^|OTH|^||^|False|^|ARV|^||^|505074|^||^|False|^|False|^||^||^||^||^|0|^||^||^||^|505074|^||^|505074|^||^||^|460752|^|NA|^||^||^|TK |^||^||^|505126|^|True|^|1236014071623011361|^|True|^||^||^|3019656|^|3013716|^|3019679|^|1010066|^|1976|^|True|^||^|1000220295|^||^||^||^||^||^|I|!|

The structure of the second data set is also same

I am performing on first 5 columns . As you can see the combination of all first 5 columns does not provide me enough partition and that leads to data skew .

The spark job stuck on some of the Executor .

The size of the first dataset is 270 GB and second is 5 GB but expected to increase .

Total no of partition 1128

This is how I perform my join

val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))

I tried implementing Broadcast Join but no impact .

So in this case can I use salting or hashing on join key so that the joining key will become random and skew will not occur I guess .

Here is my query and app details

Here is the cluster details when we are loading the data .

And here is cluster details when most of the container is idle.

Adding the details of the task where some are 10 and on some executor only 3 to 4 .

回答1:

Please consider the following points:

1) Since you have 60 executors and 10 cores per executor your partitions should be at least 60 x 10 = 600 partitions

2) In your case you have 270GB / 1128 ~ 241MB this should approximately be the partition size which looks quite big to me (considering data exchange during shuffling). Try first to re-partition to something more realistic for instance 8K or even 16K.

3) Since I can not see clearly how many executors participate on job execution you need to check it again and figure out the exact number of participating executors and if data is equally distributed. If data deviation between executors is low then your data is well distributed otherwise you face skewing.

4) If after re-partition skewing insists try to redistribute the join keys as described here