Is it possible to scale data by group in Spark?

2020-02-13 03:24发布

问题:

I want to scale data with StandardScaler (from pyspark.mllib.feature import StandardScaler), by now I can do it by passing the values of RDD to transform function, but the problem is that I want to preserve the key. is there anyway that I scale my data by preserving its key?

Sample dataset

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf.

Imports

import sys
import os
from collections import OrderedDict
from numpy import array
from math import sqrt
try:
    from pyspark import SparkContext, SparkConf
    from pyspark.mllib.clustering import KMeans
    from pyspark.mllib.feature import StandardScaler
    from pyspark.statcounter import StatCounter

    print ("Successfully imported Spark Modules")
except ImportError as e:
    print ("Can not import Spark Modules", e)
    sys.exit(1)

Portion of code

    sc = SparkContext(conf=conf)   
    raw_data = sc.textFile(data_file)
    parsed_data = raw_data.map(Parseline)

Parseline function:

def Parseline(line):
    line_split = line.split(",")
    clean_line_split = [line_split[0]]+line_split[4:-1]
    return (line_split[-1], array([float(x) for x in clean_line_split]))

回答1:

Not exactly a pretty solution but you can adjust my answer to the similar Scala question. Lets start with an example data:

import numpy as np

np.random.seed(323)

keys = ["foo"] * 50 + ["bar"] * 50
values = (
    np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) +
    np.random.rand(100, 10)
)

rdd = sc.parallelize(zip(keys, values))

Unfortunately MultivariateStatisticalSummary is just a wrapper around a JVM model and it is not really Python friendly. Luckily with NumPy array we can use standard StatCounter to compute statistics by key:

from pyspark.statcounter import StatCounter

def compute_stats(rdd):
    return rdd.aggregateByKey(
        StatCounter(), StatCounter.merge, StatCounter.mergeStats
    ).collectAsMap()

Finally we can map to normalize:

def scale(rdd, stats):
    def scale_(kv):
        k, v = kv
        return (v - stats[k].mean()) / stats[k].stdev()
    return rdd.map(scale_)

scaled = scale(rdd, compute_stats(rdd))
scaled.first()

## array([ 1.59879188, -1.66816084,  1.38546532,  1.76122047,  1.48132643,
##    0.01512487,  1.49336769,  0.47765982, -1.04271866,  1.55288814])