Skip to content

Latest commit

 

History

History
500 lines (346 loc) · 16.8 KB

09a-summarizing.asciidoc

File metadata and controls

500 lines (346 loc) · 16.8 KB

simplify and summarize patterns in data: even simple counts and frequencies require some craft at large scale; measures that require any global context, like the median, become fiendish.

Describe long-tail and normal distribution

Build intuition about long-tail

Log counts and combinators (see blekko posts)

Libraries:

  • SciRuby

    • Distribution — probability distributions; uses fast GSL or Statistics2 if available

  • ruby-gsl-ng — interface to the GSL, JRuby-compatible. Incomplete tho.

Line numbering and exact median

Line numbering is astonishingly hard. Each reducer needs to know how many records came before it — but that information is non-local. It’s a total sort with a degree of difficulty.

Mapper

Choose how you’re going to partition and order the data.

Pig comes with a sampling partitioner to do a total sort with no pre-knowledge of the key distribution. If you know something about the data, you can partition yourself, saving a pass over the data. Temperature has a non-uniform distribution (there are more warm and chilly days than boiling or frigid days) Let’s be thoughtful but not overthink; we’ll take 25 C as the midpoint

# TODO: inverse distribution
       return low_bound if temp < -30
       return hi_bound  if temp >= 80
       (temp - 25.0)

sidebar: If the partition is the filename: in Pig, look at the PathPartitioner; in Wukong and Hadoop streaming, use the ENV['map_input_file'] environment variable. To partition randomly see [consistent_random_sampling].

For each record, emit a

[partition_key]   B    [key]  [record]

Each mapper must also emit how many keys it saw for each partition key, and send that list to every partition:

       0   A    [p0_count, p1_count, ... pN_count]
       1   A    [p0_count, p1_count, ... pN_count]
...
       N   A    [p0_count, p1_count, ... pN_count]

Now each reducer can figure out which partition key in order it is, sum the counts for every partition,

Approximate Median

Compare : Count bins (histogram) For 100 M rows — What about when there are 10,000 values? 1m values? 1B possible values?

observations:

  • it’s hadoop, we don’t have to be total wusses

  • the reducer already does a sort, so worrying about order N beyond that is silly *

set of numbers with exact ranks below and above — might not be members though cap on output from any mapper — target of data sent to each reducer if binning is easy then data sent to reducer will be small, if not it will be large can also adjust the partition vs local sort

  • single precision: 24 bits, exp - 126 to + 127 (8 bits)

  • double precision: 53 bits, exp -1022 to +1023 (11 bits)

Some Useful Statistical Functions

avoiding underflow and loss of precision

  • avoid subtracting nearly equal numbers [1]

  • avoid adding small numbers to very large numbers

  • stop and think any time you are mixing addition and exponentiation

  • stop and think any time you make a number huge, then regular (log(fact(x)))

  • stop and think any time you make a number tiny, then regular (1 - exp(-exp(x)))

Float times are sneaky examples of this

use special functions for the following:

  • log(1 + x) when x might be small log_one_plus(x)

  • log(1 + exp(x)) log_one_plus_exp(x)

  • log( -log(1 - x) ) complementary_log_log(x)

  • 1 - exp(-exp(x)) complementary_log_log_inverse(x)

  • log(n!) log_fact(x)

  • exp(x) - 1 exp_x_minus_one(x)

  • log( x / (1-x) ) the "logit" function logit(x)

  • exp(x)/(1 + exp(x)) inverse logit inverse_logit(x)

  • log(logit(x)) log logit log_logit(x)

  • Ratios of factorials — log( 200! / (190! * 10!)) = logfact(200) - logfact(190) - logfact(10)

Always add a comment explaining why you used these crazy functions, or some helpful soul will "refactor" your code to be simpler (and, unwittingly, wrong).

  • def approx_eq(xx,yy) (xx - yy).abs < TOL ; end

  • If you want to find weirdness, 0.1 cannot be exactly represented in a float.

  • to uniquely represent in decimal,

    • %12.9e3f for a float, 9 decimal digits of mantissa, exponent, and sign

    • for a double, 16 decimal digits of mantissa (plus sign and exponent)

    • note %a — a direct hex representation of the floating-point number. "%a" % 1e-100 is "0x1.bff2ee48e053p-333"; "%a" % 0.1 is "0x1.999999999999ap-4"

TODO: ensure some calculations would cause underflow, overflow, etc with float/int; and maybe even double/long.

Generate exponential variate:

expdist  = Math.log( rand ) / log_one_plus( -geomparam )
geomdist = floor( expdist )
Error function            	| `Math.erf`   		| http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-erf
  Complementary Error func	| `Math.erfc`		| http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-erfc
Inverse Error function   	|
Phi (standard normal CDF) 	| `0.5 * ( 1 + erf( x / sqrtof2 ) )`
Phi inverse               	| `sqrt(2.0) * ierf(2.0*x - 1.0)`
    				| `CC0, CC1, CC2 = [2.515517, 0.802853, 0.010328] ; DD0, DD1, DD2 = [1.432788, 0.189269, 0.001308]`
				| `def approx_rational(t) numerator = (CC2*t + CC1)*t + CC0 ; denominator = ((DD2*t + DD1)*t + DD0)*t + 1.0 ; t - numerator / denominator ; end`
				| `def inv_phi(p) if p < 0.5 then -approx_rational( Math.sqrt(-2.0 * Math.log(p)) ) else  approx_rational( Math.sqrt(-2.0 * Math.log(1.0 - p)) ) ; end`
Gamma                     	| `Math.gamma`		| http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-gamma
Log Gamma                 	| `Math.lgamma`		| http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-lgamma
`log(1 + x)` for small x  	| `(fabs(x) > 1e-4) ? log(1.0 + x) : (-0.5*x + 1.0)*x`
    else
`exp(x) - 1` for small x  	|
`log(n!)`                 	| `Math.lgamma(n+1)`	|
fraction+exponent of `exp()`	| `Math.frexp`		| http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-erfc

    fr, ex = Math.frexp(val)
    # fr a float, ex an int
    fr * 2**ex   == val # => true
    ldexp(fr,ex) == val # => true

Uniform distributed (0.2 uSec)	| `rand`
Gamma distributed         	|
Normal distributed  (1.9 uSec) 	| `mean + stddev * Math.sqrt(-2.0 * Math.log(uniform)) * Math.cos(2.0 * Math::PI * uniform)`
                          	| `mean + stddev * Math.sqrt(-2.0 * Math.log(uniform)) * Math.sin(2.0 * Math::PI * uniform)`

Beta distributed          	| `uu = gammadist(a, 1) ; vv = gammadist(b, 1) ; u / (u + v)`
Cauchy distributed        	| `median + scale * Math.tan(Math::PI * (uniform - 0.5))`
Chi-square distributed    	| `gammadist(0.5 * degrees_of_freedom, 2.0)`
Exponential distributed   	| `1.0 / gammadist(shape, 1.0 / scale)`
Inverse gamma distributed 	| `1.0 / gammadist(shape, 1.0 / scale)`
Laplace distributed       	| `mean + Math.log(2) + ((u < 0.5 ? 1 : -1) * scale * Math.log(u < 0.5 ? u : 1 - u))`
Log normal distributed    	| `Math.exp(normal(mu, sigma))`
Poisson distributed       	|
Student-t distributed     	| `normal / ((chi_square(degrees_of_freedom) / degrees_of_freedom) ** 0.5)`
Weibull distributed       	| `scale * ((-Math.log(uniform)) ** (1.0 / shape))`
Geometric distributed      	| `expdist( -1.0 / log_one_plus(-geomparam) ).floor`

Binomial probability

        # @param pp [Float]
	# @param qq [Float]
	# @param mm [Integer]
	# @param nn [Integer]
	def binomial_prob(pp, qq, m, n)
	  log_bin  = gamma(mm + nn + 1.0)
	  log_bin -= lgamma(nn + 1.0) + lgamma(mm + 1.0)
	  log_bin += (mm * log(pp)) + (nn*log(qq))
	  return exp(log_bin)
	end

references:

Average and Standard Deviation using Welford’s Method

The naive method is var = ( sum(xx2) - (sum(x)2/count) ) / (count-1) (if it’s the entire population, divide by count not count-1. The difference is negligible for large count).

But wait!! We’re subtracting two possibly-close numbers, breaking the cardinal rule of numerical computing.

Welford’s method calculates these moments in a streaming fashion, in one pass. It avoids the danger of loss of numerical precision present in the naive approach.

    field :count,  Integer,  doc: "Number of records seen so far"
    field :mm,     Float,    doc: "A running estimate of the mean"
    field :ss,     Float,    doc: "A running proportion of the variance; the variance is `ss / (count-1)`"

    class Welford
      def initialize
        first_row(0.0)
      end

      def first_row(first_val)
        @count  = 0
	@mm     = first_val
	@ss     = 0.0
      end

      def process(val)
        @count   += 1
        diff      = val - @mm
        @mm, @ss  = [ @mm + (diff / @count), @ss + (diff * diff) ]
      end

      def stop
        emit( results )
      end

      def results
        [ count, mean, variance, stddev, mm, ss ]
      end

      def mean
        return 0.0 if count < 1
        mm
      end

      def variance
        return 0.0 if count < 2
        ss / (count - 1)
      end

      def stddev
        Math.sqrt(variance)
      end
    end

    class WelfordReducer
      mm_all  = sum{|count, mm| count * mm } / sum{|count| count }
      ss_all  = sum{ FIXME }
    end

Weighted:

    class WeightedWelford

      def process(val, weight)
        new_total_weight = total_weight + weight
	diff  = val - @mm
	rr    = diff * weight / new_total_weight
	@mm  += rr
	@ss  += @mm + (total_weight * diff * rr)
	total_weight = new_total_weight
	super
      end

      def variance
        ( ss * count.to_f / total_weight ) / (count-1)
      end
    end

    class WeightedWelfordReducer
      mm_all  = sum{ FIXME: what goes here }
      ss_all  = sum{ FIXME: what goes here }
    end

Naively:

    class Naive < Welford
      field :sum,    Float,    doc: "The simple sum of all the numbers"
      field :sum_sq, Float,    doc: "The simple sum of squares for all the numbers"

      def first_row(*)
	@sum    = 0
	@sum_sq = 0
	super
      end

      def process(val)
	@sum     += val
	@sum_sq  += val * val
	super
      end

      def results
        super + [ naive_mean, naive_variance, naive_stddev, sum, sum_sq ]
      end

      def naive_average   ; ( sum / count ) 				 ; end
      def naive_variance  ; ( sum_sq - ((sum * sum)/count) ) / (count-1) ; end
      def naive_stddev    ; Math.sqrt(naive_variance) 			 ; end
    end

Directly:

    def DirectMoments < Naive
      field :known_count,    doc: "The already-computed final count of all values"
      field :known_mean,     doc: "The already-computed final mean of all values"
      field :sum_dev_sq,     doc: "A running sum of the squared difference between each value and the mean"
      field :sdsq_adj,       doc: "A compensated-summation correction of the running sum"

      def first_row(*)
        @sum_dev_sq  = 0
	@sdsq_adj    = 0
	super
      end

      def process(val)
        @sum_dev_sq  += (val - known_mean)**2
	@sdsq_adj    += (val - known_mean)
	super
      end

      def results
        super + [ direct_mean, direct_variance, direct_stddev, compsum_variance, @sum_dev_sq, @sdsq_adj ]
      end

      def direct_mean      ; known_mean                 ; end
      def direct_stddev    ; Math.sqrt(direct_variance) ; end

      def direct_variance  ; sum_dev_sq / (count - 1)   ; end
      def compsum_variance
        ( sum_dev_sq - (sdsq_adj**2 / count) ) / (count-1)
      end
    end

To find higher moments,

  • each partition calculates the statistical moments (g0, mu, var, alpha_3, alpha_4)

    • for a time series, g0 is the duration; for a series, it’s the count.

  • now get g_mo_part(mo, part) := mm(mo,part) * g0(part)

  • then raw_moment(mo) := g_mo_all / g0_all

  • from raw moments get central moments: theta_mo(mo) := Expectation[(val - mean)**mo]

  • finally

    • mean_all := m_1_all

    • var_all := theta_2_all

    • alpha_3_all := theta_3_all / (var_all ** 3)

    • `alpha_4_all := theta_4_all / (var_all ** 4)

references:

  • John Cook’s Accurately computing running variance, who in turn cites

    • "Chan, Tony F.; Golub, Gene H.; LeVeque, Randall J. (1983). Algorithms for Computing the Sample Variance: Analysis and Recommendations. The American Statistician 37, 242-247."

    • "Ling, Robert F. (1974). Comparison of Several Algorithms for Computing Sample Means and Variances. Journal of the American Statistical Association, Vol. 69, No. 348, 859-866."

  • Algorithms for calculating variance

Total

   class CompensatedSummer
     field :tot, Float, doc: "Total of all values seen so far"
     field :adj,  Float, doc: "Accumulated adjustment to total"

     def first_record(val)
       self.tot = val
       self.adj  = 0
      end

     def process(val)
       old_tot  = @tot
       adj_val  = val - @adj
       @tot     =         old_tot  + adj_val
       @adj     = (@tot - old_tot) - adj_val
     end
   end
a      ____total____
a    +        _valH_ _valL_
a    = ___tmptot____
a
a      ___tmptot____
a    - ____total____
a    =        _valH_
a
a             _valH_
a    -        _valH_ _valL_
a    =               _valL_    (-corr)

Covariance

do

`( 1 / (count-1)) * sum[ ((val_x - mean_x) / stddev_x) * ((val_y - mean_y) / stddev_y) ]

To combine covariance of two sets,

CovAB = Cov_A + Cov_B + ( (mean_x_a - mean_x_b) * (mean_y_a - mean_y_b) * (count_a * count_b / (count_a + count_b)) )

Regression

    sx = 0, sy = 0, stt = 0.0, sts = 0.0

    sx = x_vals.sum
    sy = y_vals.sum

    x_vals.zip(y_vals).each do |xval, yval|
      t    = xval - (sx / count)
      stt += t * t
      sts += t * yval
    end

    slope     = sts / stt
    intercept = (sy - sx*slope) / count

To make a naive algorithm fail,

    num_samples      = 1e6

    def generate_samples
      xvals = num_samples.times.map{|i| x_offset   + i * x_spread }
      yvals = xvals.map{|xval| (actual_slope * xval) + actual_intercept + (actual_variance * normaldist()) }
    end

    large constant offset causes loss of precision:

    actual_slope     = 3
    actual_intercept = 1e10
    actual_variance  = 100
    x_offset         = 1e10
    x_spread         = 1
    generate_samples(...)

    very large slope causes inaccurate intercept:

    actual_slope     = 1e6
    actual_intercept = 50
    actual_variance  = 1
    x_offset         = 0
    x_spread         = 1e6
    generate_samples(...)

Using frexp, ldexp, and tracking int and frac separately

break numbers into bins where we can conveniently do exact Bignum math.

running totals — 

int_part, frac_part
frac_part = frac_part * smallest possible

keep sums using

Approximate methods

We can also just approximate.

Reservoir sampling.

If you know distribution, can do a good job. I know that cities of the world lie between 1 and 8 billion. If I want to know median within .1% (one part in 1000),

X_n / X_n-1 = 1.001 or log(xn) - log(xn1) = -3

Sampling

Random numbers + Hadoop considered harmful

Don’t generate a random number as a sampling or sort key in a map job. The problem is that map tasks can be restarted - because of speculative execution, a failed machine, etc. — and with random records, each of those runs will dispatch differently. It also makes life hard in general when your jobs aren’t predictable run-to-run. You want to make friends with a couple records early in the so urge, and keep track of its passage though the full data flow. Similarly to the best practice of using intrinsic vs synthetic keys, it’s always better to use intrinsic metadata —  truth should flow from the edge inward.


1. John Cook’s "cardinal rule of numerical computing" is "If x and y agree to m bits, up to m bits can be lost in computing x-y."