simplify and summarize patterns in data: even simple counts and frequencies require some craft at large scale; measures that require any global context, like the median, become fiendish.
Describe long-tail and normal distribution
Build intuition about long-tail
Log counts and combinators (see blekko posts)
Libraries:
-
-
Distribution — probability distributions; uses fast GSL or Statistics2 if available
-
-
ruby-gsl-ng — interface to the GSL, JRuby-compatible. Incomplete tho.
Line numbering is astonishingly hard. Each reducer needs to know how many records came before it — but that information is non-local. It’s a total sort with a degree of difficulty.
Choose how you’re going to partition and order the data.
Pig comes with a sampling partitioner to do a total sort with no pre-knowledge of the key distribution. If you know something about the data, you can partition yourself, saving a pass over the data. Temperature has a non-uniform distribution (there are more warm and chilly days than boiling or frigid days) Let’s be thoughtful but not overthink; we’ll take 25 C as the midpoint
# TODO: inverse distribution return low_bound if temp < -30 return hi_bound if temp >= 80 (temp - 25.0)
sidebar: If the partition is the filename: in Pig, look at the PathPartitioner
; in Wukong and Hadoop streaming, use the ENV['map_input_file']
environment variable. To partition randomly see [consistent_random_sampling].
For each record, emit a
[partition_key] B [key] [record]
Each mapper must also emit how many keys it saw for each partition key, and send that list to every partition:
0 A [p0_count, p1_count, ... pN_count] 1 A [p0_count, p1_count, ... pN_count] ... N A [p0_count, p1_count, ... pN_count]
Now each reducer can figure out which partition key in order it is, sum the counts for every partition,
Compare : Count bins (histogram) For 100 M rows — What about when there are 10,000 values? 1m values? 1B possible values?
observations:
-
it’s hadoop, we don’t have to be total wusses
-
the reducer already does a sort, so worrying about order N beyond that is silly *
set of numbers with exact ranks below and above — might not be members though cap on output from any mapper — target of data sent to each reducer if binning is easy then data sent to reducer will be small, if not it will be large can also adjust the partition vs local sort
-
single precision: 24 bits, exp - 126 to + 127 (8 bits)
-
double precision: 53 bits, exp -1022 to +1023 (11 bits)
-
avoid subtracting nearly equal numbers [1]
-
avoid adding small numbers to very large numbers
-
stop and think any time you are mixing addition and exponentiation
-
stop and think any time you make a number huge, then regular (
log(fact(x))
) -
stop and think any time you make a number tiny, then regular (
1 - exp(-exp(x))
)
Float times are sneaky examples of this
use special functions for the following:
-
log(1 + x)
when x might be small log_one_plus(x) -
log(1 + exp(x))
log_one_plus_exp(x) -
log( -log(1 - x) )
complementary_log_log(x) -
1 - exp(-exp(x))
complementary_log_log_inverse(x) -
log(n!)
log_fact(x) -
exp(x) - 1
exp_x_minus_one(x) -
log( x / (1-x) )
the "logit" function logit(x) -
exp(x)/(1 + exp(x))
inverse logit inverse_logit(x) -
log(logit(x))
log logit log_logit(x) -
Ratios of factorials —
log( 200! / (190! * 10!))
=logfact(200) - logfact(190) - logfact(10)
Always add a comment explaining why you used these crazy functions, or some helpful soul will "refactor" your code to be simpler (and, unwittingly, wrong).
-
def approx_eq(xx,yy) (xx - yy).abs < TOL ; end
-
If you want to find weirdness, 0.1 cannot be exactly represented in a float.
-
to uniquely represent in decimal,
-
%12.9e3f
for a float, 9 decimal digits of mantissa, exponent, and sign -
for a double, 16 decimal digits of mantissa (plus sign and exponent)
-
note
%a
— a direct hex representation of the floating-point number."%a" % 1e-100
is"0x1.bff2ee48e053p-333"
;"%a" % 0.1
is"0x1.999999999999ap-4"
-
TODO: ensure some calculations would cause underflow, overflow, etc with float/int; and maybe even double/long.
reference: Avoiding Overflow, Underflow, and Loss of Precision reference: Don’t Store That in a Float
Generate exponential variate:
expdist = Math.log( rand ) / log_one_plus( -geomparam ) geomdist = floor( expdist )
Error function | `Math.erf` | http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-erf Complementary Error func | `Math.erfc` | http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-erfc Inverse Error function | Phi (standard normal CDF) | `0.5 * ( 1 + erf( x / sqrtof2 ) )` Phi inverse | `sqrt(2.0) * ierf(2.0*x - 1.0)` | `CC0, CC1, CC2 = [2.515517, 0.802853, 0.010328] ; DD0, DD1, DD2 = [1.432788, 0.189269, 0.001308]` | `def approx_rational(t) numerator = (CC2*t + CC1)*t + CC0 ; denominator = ((DD2*t + DD1)*t + DD0)*t + 1.0 ; t - numerator / denominator ; end` | `def inv_phi(p) if p < 0.5 then -approx_rational( Math.sqrt(-2.0 * Math.log(p)) ) else approx_rational( Math.sqrt(-2.0 * Math.log(1.0 - p)) ) ; end` Gamma | `Math.gamma` | http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-gamma Log Gamma | `Math.lgamma` | http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-lgamma `log(1 + x)` for small x | `(fabs(x) > 1e-4) ? log(1.0 + x) : (-0.5*x + 1.0)*x` else `exp(x) - 1` for small x | `log(n!)` | `Math.lgamma(n+1)` | fraction+exponent of `exp()` | `Math.frexp` | http://www.ruby-doc.org/core-1.9.3/Math.html#method-c-erfc fr, ex = Math.frexp(val) # fr a float, ex an int fr * 2**ex == val # => true ldexp(fr,ex) == val # => true Uniform distributed (0.2 uSec) | `rand` Gamma distributed | Normal distributed (1.9 uSec) | `mean + stddev * Math.sqrt(-2.0 * Math.log(uniform)) * Math.cos(2.0 * Math::PI * uniform)` | `mean + stddev * Math.sqrt(-2.0 * Math.log(uniform)) * Math.sin(2.0 * Math::PI * uniform)` Beta distributed | `uu = gammadist(a, 1) ; vv = gammadist(b, 1) ; u / (u + v)` Cauchy distributed | `median + scale * Math.tan(Math::PI * (uniform - 0.5))` Chi-square distributed | `gammadist(0.5 * degrees_of_freedom, 2.0)` Exponential distributed | `1.0 / gammadist(shape, 1.0 / scale)` Inverse gamma distributed | `1.0 / gammadist(shape, 1.0 / scale)` Laplace distributed | `mean + Math.log(2) + ((u < 0.5 ? 1 : -1) * scale * Math.log(u < 0.5 ? u : 1 - u))` Log normal distributed | `Math.exp(normal(mu, sigma))` Poisson distributed | Student-t distributed | `normal / ((chi_square(degrees_of_freedom) / degrees_of_freedom) ** 0.5)` Weibull distributed | `scale * ((-Math.log(uniform)) ** (1.0 / shape))` Geometric distributed | `expdist( -1.0 / log_one_plus(-geomparam) ).floor` Binomial probability # @param pp [Float] # @param qq [Float] # @param mm [Integer] # @param nn [Integer] def binomial_prob(pp, qq, m, n) log_bin = gamma(mm + nn + 1.0) log_bin -= lgamma(nn + 1.0) + lgamma(mm + 1.0) log_bin += (mm * log(pp)) + (nn*log(qq)) return exp(log_bin) end
references:
-
John D Cook’s Stand Alone Code
-
ealdent’s simple-random, CPOL (MIT-compatible) license
The naive method is var = ( sum(xx2) - (sum(x)2/count) ) / (count-1)
(if it’s the entire population, divide by count
not count-1
. The difference is negligible for large count).
But wait!! We’re subtracting two possibly-close numbers, breaking the cardinal rule of numerical computing.
Welford’s method calculates these moments in a streaming fashion, in one pass. It avoids the danger of loss of numerical precision present in the naive approach.
field :count, Integer, doc: "Number of records seen so far" field :mm, Float, doc: "A running estimate of the mean" field :ss, Float, doc: "A running proportion of the variance; the variance is `ss / (count-1)`" class Welford def initialize first_row(0.0) end def first_row(first_val) @count = 0 @mm = first_val @ss = 0.0 end def process(val) @count += 1 diff = val - @mm @mm, @ss = [ @mm + (diff / @count), @ss + (diff * diff) ] end def stop emit( results ) end def results [ count, mean, variance, stddev, mm, ss ] end def mean return 0.0 if count < 1 mm end def variance return 0.0 if count < 2 ss / (count - 1) end def stddev Math.sqrt(variance) end end class WelfordReducer mm_all = sum{|count, mm| count * mm } / sum{|count| count } ss_all = sum{ FIXME } end
Weighted:
class WeightedWelford def process(val, weight) new_total_weight = total_weight + weight diff = val - @mm rr = diff * weight / new_total_weight @mm += rr @ss += @mm + (total_weight * diff * rr) total_weight = new_total_weight super end def variance ( ss * count.to_f / total_weight ) / (count-1) end end class WeightedWelfordReducer mm_all = sum{ FIXME: what goes here } ss_all = sum{ FIXME: what goes here } end
Naively:
class Naive < Welford field :sum, Float, doc: "The simple sum of all the numbers" field :sum_sq, Float, doc: "The simple sum of squares for all the numbers" def first_row(*) @sum = 0 @sum_sq = 0 super end def process(val) @sum += val @sum_sq += val * val super end def results super + [ naive_mean, naive_variance, naive_stddev, sum, sum_sq ] end def naive_average ; ( sum / count ) ; end def naive_variance ; ( sum_sq - ((sum * sum)/count) ) / (count-1) ; end def naive_stddev ; Math.sqrt(naive_variance) ; end end
Directly:
def DirectMoments < Naive field :known_count, doc: "The already-computed final count of all values" field :known_mean, doc: "The already-computed final mean of all values" field :sum_dev_sq, doc: "A running sum of the squared difference between each value and the mean" field :sdsq_adj, doc: "A compensated-summation correction of the running sum" def first_row(*) @sum_dev_sq = 0 @sdsq_adj = 0 super end def process(val) @sum_dev_sq += (val - known_mean)**2 @sdsq_adj += (val - known_mean) super end def results super + [ direct_mean, direct_variance, direct_stddev, compsum_variance, @sum_dev_sq, @sdsq_adj ] end def direct_mean ; known_mean ; end def direct_stddev ; Math.sqrt(direct_variance) ; end def direct_variance ; sum_dev_sq / (count - 1) ; end def compsum_variance ( sum_dev_sq - (sdsq_adj**2 / count) ) / (count-1) end end
To find higher moments,
-
each partition calculates the statistical moments
(g0, mu, var, alpha_3, alpha_4)
-
for a time series,
g0
is the duration; for a series, it’s the count.
-
-
now get
g_mo_part(mo, part) := mm(mo,part) * g0(part)
-
then
raw_moment(mo) := g_mo_all / g0_all
-
from raw moments get central moments:
theta_mo(mo) := Expectation[(val - mean)**mo]
-
finally
-
mean_all := m_1_all
-
var_all := theta_2_all
-
alpha_3_all := theta_3_all / (var_all ** 3)
-
`alpha_4_all := theta_4_all / (var_all ** 4)
-
references:
-
John Cook’s Accurately computing running variance, who in turn cites
-
"Chan, Tony F.; Golub, Gene H.; LeVeque, Randall J. (1983). Algorithms for Computing the Sample Variance: Analysis and Recommendations. The American Statistician 37, 242-247."
-
"Ling, Robert F. (1974). Comparison of Several Algorithms for Computing Sample Means and Variances. Journal of the American Statistical Association, Vol. 69, No. 348, 859-866."
-
class CompensatedSummer field :tot, Float, doc: "Total of all values seen so far" field :adj, Float, doc: "Accumulated adjustment to total" def first_record(val) self.tot = val self.adj = 0 end def process(val) old_tot = @tot adj_val = val - @adj @tot = old_tot + adj_val @adj = (@tot - old_tot) - adj_val end end
Consider this diagram, adapted from What Every Computer Scientist Should Know About Floating-Point Arithmetic
a ____total____ a + _valH_ _valL_ a = ___tmptot____ a a ___tmptot____ a - ____total____ a = _valH_ a a _valH_ a - _valH_ _valL_ a = _valL_ (-corr)
do
`( 1 / (count-1)) * sum[ ((val_x - mean_x) / stddev_x) * ((val_y - mean_y) / stddev_y) ]
To combine covariance of two sets,
CovAB = Cov_A + Cov_B + ( (mean_x_a - mean_x_b) * (mean_y_a - mean_y_b) * (count_a * count_b / (count_a + count_b)) )
REFERENCE: How to calculate correlation accurately
sx = 0, sy = 0, stt = 0.0, sts = 0.0 sx = x_vals.sum sy = y_vals.sum x_vals.zip(y_vals).each do |xval, yval| t = xval - (sx / count) stt += t * t sts += t * yval end slope = sts / stt intercept = (sy - sx*slope) / count
To make a naive algorithm fail,
num_samples = 1e6 def generate_samples xvals = num_samples.times.map{|i| x_offset + i * x_spread } yvals = xvals.map{|xval| (actual_slope * xval) + actual_intercept + (actual_variance * normaldist()) } end large constant offset causes loss of precision: actual_slope = 3 actual_intercept = 1e10 actual_variance = 100 x_offset = 1e10 x_spread = 1 generate_samples(...) very large slope causes inaccurate intercept: actual_slope = 1e6 actual_intercept = 50 actual_variance = 1 x_offset = 0 x_spread = 1e6 generate_samples(...)
-
John Cook, Comparing two ways to fit a line to data
break numbers into bins where we can conveniently do exact Bignum math.
running totals —
int_part, frac_part frac_part = frac_part * smallest possible
keep sums using
Don’t generate a random number as a sampling or sort key in a map job. The problem is that map tasks can be restarted - because of speculative execution, a failed machine, etc. — and with random records, each of those runs will dispatch differently. It also makes life hard in general when your jobs aren’t predictable run-to-run. You want to make friends with a couple records early in the so urge, and keep track of its passage though the full data flow. Similarly to the best practice of using intrinsic vs synthetic keys, it’s always better to use intrinsic metadata — truth should flow from the edge inward.
x
and y
agree to m
bits, up to m
bits can be lost in computing x-y
."