Source code for dragons.munge.munge

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""A collection of functions for doing common processing tasks."""

import re
import numpy as np
from astropy import log
from astroML.density_estimation import scotts_bin_width, freedman_bin_width,\
    knuth_bin_width, bayesian_blocks
from pandas import DataFrame
from scipy.stats import describe as sp_describe

__author__ = 'Simon Mutch'
__email__ = 'smutch.astro@gmail.com'
__version__ = '0.1.0'


[docs]def pretty_print_dict(d, fmtlen=30): """Pretty print a dictionary, dealing with recursion. *Args*: d : dict the dictionary to print fmtlen : int maximum length of dictionary key for print formatting """ fmtstr = "%%%ds :" % fmtlen fmtstr_title = "\n%%%ds\n%%%ds" % (fmtlen, fmtlen) for k, v in d.items(): if isinstance(v, dict): print(fmtstr_title % (k.upper(), re.sub('\S', '-', k))) pretty_print_dict(v) else: print(fmtstr % k, v)
[docs]def ndarray_to_dataframe(arr, drop_vectors=False): """Convert numpy ndarray to a pandas DataFrame, dealing with N(>1) dimensional datatypes. *Args*: arr : ndarray Numpy ndarray *Kwargs*: drop_vectors : bool only include single value datatypes in output DataFrame *Returns*: df : DataFrame Pandas DataFrame """ # Get a list of all of the columns which are 1D names = [] for k, v in arr.dtype.fields.items(): if len(v[0].shape) == 0: names.append(k) # Create a new dataframe with these columns df = DataFrame(arr[names]) if not drop_vectors: # Loop through each N(>1)D property and append each dimension as # its own column in the dataframe for k, v in arr.dtype.fields.items(): if len(v[0].shape) != 0: for i in range(v[0].shape[0]): df[k+'_%d' % i] = arr[k][:, i] return df
[docs]def mass_function(mass, volume, bins, range=None, poisson_uncert=False, return_edges=False, **kwargs): """Generate a mass function. *Args*: mass : array an array of 'masses' volume : float volume of simulation cube/subset bins : int or list or str If bins is a string, then it must be one of: | 'blocks' : use bayesian blocks for dynamic bin widths | 'knuth' : use Knuth's rule to determine bins | 'scott' : use Scott's rule to determine bins | 'freedman' : use the Freedman-diaconis rule to determine bins *Kwargs*: range : len=2 list or array range of data to be used for mass function poisson_uncert : bool return poisson uncertainties in output array (default: False) return_edges : bool return the bin_edges (default: False) \*\*kwargs passed to np.histogram call *Returns*: array of [bin centers, mass function vals] If poisson_uncert=True then array has 3rd column with uncertainties. If return_edges=True then the bin edges are also returned. *Notes*: The code to generate the bin_widths is taken from astroML.hist """ if "normed" in kwargs: kwargs["normed"] = False log.warn("Turned off normed kwarg in mass_function()") if (range is not None and (bins in ['blocks', 'knuth', 'knuths', 'scott', 'scotts', 'freedman', 'freedmans'])): mass = mass[(mass >= range[0]) & (mass <= range[1])] if isinstance(bins, str): log.info("Calculating bin widths using `%s' method..." % bins) if bins in ['blocks']: bins = bayesian_blocks(mass) elif bins in ['knuth', 'knuths']: dm, bins = knuth_bin_width(mass, True) elif bins in ['scott', 'scotts']: dm, bins = scotts_bin_width(mass, True) elif bins in ['freedman', 'freedmans']: dm, bins = freedman_bin_width(mass, True) else: raise ValueError("unrecognized bin code: '%s'" % bins) log.info("...done") vals, edges = np.histogram(mass, bins, range, **kwargs) width = edges[1]-edges[0] radius = width/2.0 centers = edges[:-1]+radius if poisson_uncert: uncert = np.sqrt(vals.astype(float)) vals = vals.astype(float) / (volume * width) if not poisson_uncert: mf = np.dstack((centers, vals)).squeeze() else: uncert /= (volume * width) mf = np.dstack((centers, vals, uncert)).squeeze() if not return_edges: return mf else: return mf, edges
[docs]def edges_to_centers(edges, width=False): """Convert **evenly spaced** bin edges to centers. *Args*: edges : ndarray bin edges *Kwargs*: width : bool also return the bin width *Returns*: centers : ndarray bin centers (size = edges.size-1) bin_width : float only returned if width = True """ bin_width = edges[1] - edges[0] radius = bin_width * 0.5 centers = edges[:-1] + radius if width: return centers, bin_width else: return centers
[docs]def describe(arr, **kwargs): """Run scipy.stats.describe and produce legible output. *Args*: arr : ndarray Numpy ndarray *Kwargs*: passed to scipy.stats.describe *Returns*: output of scipy.stats.describe """ stats = sp_describe(arr) print(("{:15s} : {:g}".format("size", stats[0]))) print(("{:15s} : {:g}".format("min", stats[1][0]))) print(("{:15s} : {:g}".format("max", stats[1][1]))) print(("{:15s} : {:g}".format("mean", stats[2]))) print(("{:15s} : {:g}".format("unbiased var", stats[3]))) print(("{:15s} : {:g}".format("biased skew", stats[4]))) print(("{:15s} : {:g}".format("biased kurt", stats[5]))) return stats
[docs]def smooth_grid(grid, side_length, radius, filt="tophat"): """Smooth a grid by convolution with a filter. *Args*: grid : ndarray The grid to be smoothed side_length : float The side length of the grid (assumes all side lengths are equal) radius : float The radius of the smoothing filter *Kwargs*: filt : string The name of the filter. Currently only "tophat" (real space) is implemented. More filters will be added over time. *Returns*: smoothed_grid : ndarray The smoothed grid """ # The tuple of implemented filters IMPLEMENTED_FILTERS = ("tophat",) # Check to see if this filter is implemented if filt not in IMPLEMENTED_FILTERS: raise NotImplementedError("Filter not implemented.") side_length, radius = float(side_length), float(radius) # Do the forward fft grid = np.fft.rfftn(grid) # Construct a grid of k*radius values k = 2.0 * np.pi * np.fft.fftfreq(grid.shape[0], 1/float(grid.shape[0])) / side_length k_r = 2.0 * np.pi * np.fft.rfftfreq(grid.shape[0], 1/float(grid.shape[0])) / side_length k = np.meshgrid(k, k, k_r, sparse=True, indexing='ij') kR = np.sqrt(k[0]**2 + k[1]**2 + k[2]**2)*radius # Evaluate the convolution if filt == "tophat": fgrid = grid * 3.0 * (np.sin(kR)/kR**3 - np.cos(kR)/kR**2) fgrid[kR == 0] = grid[kR == 0] # Inverse transform back to real space grid = np.fft.irfftn(fgrid).real # Make sure fgrid is marked available for garbage collection del(fgrid) del(k) del(kR) return grid
[docs]def power_spectrum(grid, side_length, n_bins): r"""Calculate the dimensionless power spectrum of a grid (G): .. math:: \Delta = \frac{k^3}{2\pi^2 V} <|\hat G|^2> *Args*: grid : ndarray The grid from which to construct the power spectrum side_length : float The side length of the grid (assumes all side lengths are equal) n_bins : int The number of k bins to use *Returns*: kmean : ndarray The mean wavenumber of each bin power : ndarray The dimensionless power (:math:`\Delta`) uncert : ndarray The standard deviation of the power within each k bin """ volume = side_length**3 # do the FFT (note the normalising 1.0/N_cells factor) ft_grid = np.fft.rfftn(grid) / float(grid.size) # generate a grid of k magnitudes k1d = 2.0*np.pi * np.fft.fftfreq(grid.shape[0], 1/float(grid.shape[0])) / side_length k1d_r = 2.0*np.pi * np.fft.rfftfreq(grid.shape[0], 1/float(grid.shape[0])) / side_length k = np.meshgrid(k1d, k1d, k1d_r, sparse=True, indexing='ij') k = np.sqrt(k[0]**2 + k[1]**2 + k[2]**2) # bin up the k magnitudes k_edges = np.logspace(np.log10(k1d_r[1]), np.log10(k1d_r[-1]), n_bins+1) k_bin = np.digitize(k.flat, k_edges) - 1 np.clip(k_bin, 0, n_bins-1, k_bin) k_bin.shape = k.shape # loop through each k magnitude bin and calculate the mean power, k and # uncert kmean = np.zeros(n_bins) power = np.zeros(n_bins) uncert = np.zeros(n_bins) for ii in range(n_bins): sel = k_bin == ii val = k[sel]**3 * np.abs(ft_grid[sel])**2 / (2.0 * np.pi**2) * volume power[ii] = val.mean() uncert[ii] = val.std() kmean[ii] = k[sel].mean() return kmean, power, uncert