Skip to content

PseudoCode ‐ Normalization

Juan Emmanuel Johnson edited this page Feb 17, 2024 · 2 revisions

Examples ML

PyTorch

import torchvision.transforms as T

transform = A.Normalize(mean=mean, std=std)

def transform(sample):
  image = sample["image"].numpy().transpose(1,2,0)
  image = augmentation(image=image)["image"]
  return dict(image=image)

xarray

Dask Parallelization

This method works very well when things are clean

from dask.diagnostics import ProgressBar

ds = xr.open_mfdataset(list_of_files, chunks={"time": 1, “channel”: 1}, compute=False)
mean: xr.Dataset = ds.mean(compute=False)
std: xr.Dataset = ds.std(compute=False)

with ProgressBar():
  mean.compute()
 

Manual Parallelization

def preprocess(data):
  mean = data.mean(dims=[…])
  return mean

means = xr.open_mfdataset(list_of_files, preprocess=preprocess)

Multiple Files

Mean

$$ \mu = \frac{1}{N}\sum_{n=1}^N x_n $$

# get list of files
list_of_files: List[str] =# create mean operator
def operator(file: str):
  # open file
  f: File = open_file(file)
  # calculate mean
  mean: Array = calculate_mean(f.data)
  # close file
  close_file(f)
  return mean

# calculate mean of each file
means: List[Array] = list(map(f, list_of_files))
# calculate mean
mean: Array = sum(mean) / len(means)

Variance

$$ \sigma = \frac{1}{N-1}\sum_{n=1}^N (x_n - \mu)^2 $$

# get list of files
list_of_files: List[str] =# get mean
mean: Array =# create mean operator
def operator(file: str):
  # open file
  f: File = open_file(file)
  # calculate mean
  variance: Array = (f.data - mean) ** 2
  # close file
  close_file(f)
  return variance

# calculate difference of each file
variances: List[Array] = list(map(f, list_of_files))
# calculate mean
variance: Array = sum(variances) / (len(variances) — 1)

Parallelization

Manually

# Use ThreadPoolExecutor for parallel downloads
with ThreadPoolExecutor(max_workers=10) as executor:
  for ifile in list_of_files:
    futures.append(executor.submit(f, ifile))
 
  # Wait for all downloads to finish
  for future in concurrent.futures.as_completed(futures):
    try:
      future.result()
    except Exception as e:
      print(f"Error during download: {e}")

Dask