# The goal

**Mann-Kendall (MK)** test is a **non-parametric** statistical test to validate
the existence of monotonic upward/downward **trend** in a time series.
The test is non-parametric, meaning that it does not assume
the underlying distribution of the data being examined.

In the context of climatic researches, it can be used to examine the significance of, for example, the rising average temperature over a region during a certain period.

There are available Python implementations of the MK test, for instance, the pyMannKendall module, which is a much more versatile module than what we are going to build in this post.

However, when dealing with climatic/meteorological data, it is often desirable to perform not just one test on a single time series, but multiple tests on all of the grid cells within a domain. With increasing horizontal resolutions of datasets, this can be fairly slow if done using a nested for loop strategy in Python.

Therefore, in this post we are going to explore a vectorized Mann-Kendall trend test implementation that is designed to speed up such kind of computations.

# The Mann-Kendall trend test

The computations in a MK test are:

1. Denote a 1-dimensional time series as \(x_1, x_2, \cdots, x_n\).

2. Determine the signs of all possible pairs of \(x_j – x_k\):

\[

s_{jk} = sgn(x_j – x_k) =

\left\{\begin{matrix}

1 & if\; x_j > x_k \\

0 & if\; x_j = x_k \\

-1 & if\; x_j < x_k \\

\end{matrix}\right.

\]

where \(j > k\), i.e. a later measurement minus an earlier one. It’s easy to work out that there are \(\frac{n(n-1)}{2}\) such pairs.

3. Sum up all the \(s_{jk}\):

\[

S = \sum_{k-1}^{n-1} \sum_{j-k+1}^{n} sgn(x_j – x_k)

\]

4. Compute the variance of \(S\):

\[

VAR(S) = \frac{1}{18} [n(n-1)(2n + 5) – T]

\]

where \(T\) is an adjustment factor for ties in the time series, and is computed as:

\[

T = \sum_{p-1}^g t_p (t_p – 1)(2t_p +5)

\]

where:

\(g\): the number of tied groups,

\(t_p\): the number of values in the group \(p\).

E.g. the sequence \(1, 2, 2, 9.3, -1, 9.3, 5, 7, 2\) has 3 tied groups:

(1) `2`

: appeared 3 times, giving \(t_1 = 3\).

(2) `9.3`

: appeared twice, giving \(t_2 = 2\).

Then the tie argument would be:

\[

T = \sum_{p-1}^g t_p (t_p – 1)(2t_p +5) = 3 \times (3 – 1) \times (2

\times 3 + 5) + 2 \times (2 – 1) \times (2 \times 2 + 5) = 84

\]

5. Compute the MK test statistic:

\[

Z = \left\{\begin{matrix}

\frac{S-1}{\sqrt{VAR(S)}} & if \; S > 0 \\

0 & if \, S = 0 \\

\frac{S+1}{\sqrt{VAR(S)}} & if \; S < 0

\end{matrix}\right.

\]

6. Do the hypothesis test using \(Z\) as the standard score on a **standard Normal distribution**.

More details of the algorithm can be found in this page.

# Serial version

The serial version is relatively straightforward:

import numpy as np from scipy.stats import norm def countTies(data): '''Count the number of ties in each group for all groups ''' counts = np.unique(data, return_counts=True)[1] counts = counts[np.where(counts>1)] if len(counts) == 0: return [0,] else: return counts def MannKendallTrend(data, tails=2, verbose=True): '''Perform Mann-Kendall Trend test on a timeseries Args: data (ndarray): 1d array, time series to examine trend. Keyword Args: tails (int): 1 or 2, 1- or 2- tailed test. Returns: p (float): p-value of the test statistic. ''' n = data.size s = np.sign(data[None,:] - data[:,None]) s = np.triu(s).sum() # Count ties counts = countTies(data) tt = np.array(list(map(lambda x:x*(x-1)*(2*x+5), counts))) tt = np.sum(tt) var = (n*(n-1)*(2*n+5)-tt)/18. z = (s - np.sign(s)) / np.sqrt(var) p = norm.cdf(z) p = 1-p if p>0.5 else p if tails == 2: p = p*2 if verbose: print('\n# <testwmw>: s',s) print('# <testwmw>: counts',counts) print('# <testwmw>: tt',tt) print('# <testwmw>: var',var) print('# <testwmw>: z',z) print('# <testwmw>: p',p) return p

The function `MannKendallTrend()`

first computes the signs of all
possible pairs using

```
s = np.sign(data[None,:] - data[:,None])
```

Note that this is already using some vectorization trick rather than a nested for loop. Then the signs are summed using:

```
s = np.triu(s).sum()
```

This is taking the upper triangle of the `s`

matrix, and we are not
excluding cases of `j = k`

on the diagonal line, since their
contributions would be 0 anyways.

Then the tied values are found using the `countTies()`

function, and
the tie adjustment factor computed.

Lastly, the percentile of a standard Normal distribution is obtained
using the `scipy.stats.norm.cdf()`

function. The p-value is doubled
if a 2-tailed test is desired.

# Vectorized version

## 1st implementation

The vectorized version will follow generally the same procedures as the serial version. The trickiest part lies in the computation of the tie adjustment factor.

To simply things, we are assuming the input data to be a **2D array**,
with shape `(m, n)`

, where `m`

is the number of records, and `n`

the
dimension size of each record.

For a multi-cell trend analysis using climatic data, `n`

would be the
length of the time series in each grid cell, and `m`

the total number of
grid cells in the data. Therefore, if the data come in as a 3D cube
with the format of `(time, latitude, longitude)`

, one would
need to reshape it into `(latitude * longitude, time)`

.

The 1st vectorized version I wrote is the following:

def MannKendallTrend2D(data, tails=2, axis=0, verbose=True): '''Vectorized Mann-Kendall tests on 2D matrix rows/columns Args: data (ndarray): 2d array with shape (m, n). Keyword Args: tails (int): 1 for 1-tail, 2 for 2-tail test. axis (int): 0: test trend in each column. 1: test trend in each row. Returns: z (ndarray): If <axis> = 0, 1d array with length <n>, standard scores corresponding to data in each row in <x>. If <axis> = 1, 1d array with length <m>, standard scores corresponding to data in each column in <x>. p (ndarray): p-values corresponding to <z>. ''' if np.ndim(data) != 2: raise Exception("<data> should be 2D.") # alway put records in rows and do M-K test on each row if axis == 0: data = data.T m, n = data.shape mask = np.triu(np.ones([n, n])).astype('int') mask = np.repeat(mask[None,...], m, axis=0) s = np.sign(data[:,None,:]-data[:,:,None]).astype('int') s = (s * mask).sum(axis=(1,2)) #--------------------Count ties-------------------- counts = countTies(data) tt = counts * (counts - 1) * (2*counts + 5) tt = tt.sum(axis=1) #-----------------Sample Gaussian----------------- var = (n * (n-1) * (2*n+5) - tt) / 18. eps = 1e-8 # avoid dividing 0 z = (s - np.sign(s)) / (np.sqrt(var) + eps) p = norm.cdf(z) p = np.where(p>0.5, 1-p, p) if tails==2: p=p*2 return z, p

Note that:

Although an `axis`

argument is provided to allow specifying the
axis of the time dimension, I’m forcing the data to be in the `(m, n)`

layout as mentioned above, by doing a transpose if `axis=0`

:
`data = data.T`

.

The signs of all possible pairs are computed using:

```
s = np.sign(data[:,None,:]-data[:,:,None]).astype('int')
```

This is similar to the serial version except that it handles all grid cells simultaneously.

Again, we take only the upper triangle and sum it up:

mask = np.triu(np.ones([n, n])).astype('int') mask = np.repeat(mask[None,...], m, axis=0) s = np.sign(data[:,None,:]-data[:,:,None]).astype('int') s = (s * mask).sum(axis=(1,2))

The tied groups are obtained using a new `countTies()`

function, given
below:

def countTies(x): '''Count number of ties in rows of a 2D matrix Args: x (ndarray): 2d matrix. Returns: result (ndarray): 2d matrix with same shape as <x>. In each row, the number of ties are inserted at (not really) arbitary locations. The locations of tie numbers in are not important, since they will be subsequently put into a formula of sum(t*(t-1)*(2t+5)). Inspired by: https://stackoverflow.com/a/24892274/2005415. ''' if np.ndim(x) != 2: raise Exception("<x> should be 2D.") m, n = x.shape pad0 = np.zeros([m, 1]).astype('int') x = copy.deepcopy(x) x.sort(axis=1) diff = np.diff(x, axis=1) cated = np.concatenate([pad0, np.where(diff==0, 1, 0), pad0], axis=1) absdiff = np.abs(np.diff(cated, axis=1)) rows, cols = np.where(absdiff==1) rows = rows.reshape(-1, 2)[:, 0] cols = cols.reshape(-1, 2) counts = np.diff(cols, axis=1)+1 result = np.zeros(x.shape).astype('int') result[rows, cols[:,1]] = counts.flatten() return result

This part is a bit tricky to vectorize, because the number of ties would be in general different for each record.

For instance, the 1st record (now has been forced to be the 1st row in
`data`

), contains 3 groups of ties:

\[

t_1 = 2; \; t_2 = 2; \; t_3 = 3

\]

If we put that in an array, it would be `[2, 2, 3]`

, or `[2, 3, 2]`

, since ordering doesn’t really matter.

Suppose the 2nd record (2nd row in `data`

) has 1 group of ties with 2
values, then the output array for that would be `[2, ]`

.

The 3rd record may have no ties, giving `[]`

.

Therefore, the vectorized tie-counts array would have different number of values in its rows.

The solution would be to pad 0s to make it a squared shape, and the
size to pad to can be just the length of the time series `n`

, since it
is guaranteed that the number of tie groups in a `n`

-length time series
is always smaller than `n`

.

Zero-padding is acceptable, because it is observed that these padded 0s make no contribution to the final result, since the formula we use to compute tie adjustment is

```
tt = counts * (counts - 1) * (2*counts + 5)
```

Additionally, the positions of tie counts do not matter, since they
are latter summed up in `tt = tt.sum(axis=1)`

.

So, the basic idea of `countTies()`

is:

- make a deep copy of the input
`data`

, because we are going to sort the data in order along the time dimension (`x.sort(axis=1)`

). - after sorting, compute the differences between consecutive pairs.
- 0 in the differentiated consecutive pairs denotes a tie, and multiple consecutive 0s denote a tie group with more than 2 tied values.
- replace such 0s with 1s, and all other places with 0s.
- find the boundaries of such 1s, and they encode the indices of the tied values.
- the number of ties in each group can be worked out using these indices, and these tie counts are put to the locations of the last tied values.
- these are all done in a vectorized manner.

In the end, the resultant `result`

variable is a `(m, n)`

array. In
each row, the number of ties in different tie groups in that row are
inserted at different places (recall that positions are
insignificant), and all other places are filled with 0s (recall that
0s do not contribute).

With the tie adjustment factor worked out, the rest of the
`MannKendallTrend2D()`

function is fairly self-explanatory.

This version works pretty well already. I tested it on a sample case
with the input data being `(50, 145, 192)`

in size (in `(time, latitude, longitude)`

layout). On my desktop it took about 1.28
seconds. This solution was also offered as
Stackoverflow answer.

## 2nd implementation

One big issue with the above implementation is memory usage. With
vectorization speed gain we are sacrificing memory efficiency. For larger
sized data, I got overflow warning and `numpy`

simply refused to
compute.

The biggest RAM eating block lies here:

mask = np.triu(np.ones([n, n])).astype('int') mask = np.repeat(mask[None,...], m, axis=0) s = np.sign(data[:,None,:]-data[:,:,None]).astype('int')

namely, the vectorized computation of all the pair-wise differences.

It is noticed that we don’t really need the complete matrix of
`data[:,None,:] - data[:,:,None]`

, since only the upper triangle is
needed, and the lower triangle gives only the opposite values.

Therefore I came up with this 2nd version (the `countTies()`

function
stays the same):

def MannKendallTrend2D(data, tails=2, axis=0, verbose=True): '''Vectorized Mann-Kendall tests on 2D matrix rows/columns Args: data (ndarray): ndarray with shape (m, n). Keyword Args: tails (int): 1 for 1-tail, 2 for 2-tail test. axis (int): 0: test trend in each column. 1: test trend in each row. Returns: z (ndarray): If <axis> = 0, 1d array with length <n>, standard scores corresponding to data in each row in <x>. If <axis> = 1, 1d array with length <m>, standard scores corresponding to data in each column in <x>. p (ndarray): p-values corresponding to <z>. ''' if np.ndim(data) != 2: raise Exception("<data> should be 2D.") # alway put records in rows and do M-K test on each row if axis == 0: data = data.T m, n = data.shape mask = np.triu_indices(n, 1) s = np.sign(data[:,mask[1]] - data[:,mask[0]]).sum(axis=1) #--------------------Count ties-------------------- counts = countTies(data) tt = counts * (counts - 1) * (2*counts + 5) tt = tt.sum(axis=1) #-----------------Sample Gaussian----------------- var = (n * (n-1) * (2*n+5) - tt) / 18. eps = 1e-8 # avoid dividing 0 z = (s - np.sign(s)) / (np.sqrt(var) + eps) p = norm.cdf(z) p = np.where(p>0.5, 1-p, p) if tails==2: p=p*2 return z, p

To highlight the differences, these are the new lines computing the pair-wise differences and signs:

mask = np.triu_indices(n, 1) s = np.sign(data[:,mask[1]] - data[:,mask[0]]).sum(axis=1)

The `np.triu_indices()`

gives indices of the upper triangle, so we can
save half of original memory.

## 3rd implementation

Unfortunately, it is still quite heavy on RAM usage.

My 3rd attempt is no longer strictly vectorized, but a compromise
between fully vectorized computation and an iterated one. I could not
figure out an efficient solution using tools in `numpy`

or `scipy`

for
this, and I think it might be a good place to look for other directions
like **cython** or compiled code.

The 3rd `MannKendallTrend2D()`

is:

def MannKendallTrend2D(data, tails=2, axis=0, verbose=True): '''Vectorized Mann-Kendall tests on 2D matrix rows/columns Args: data (ndarray): ndarray with shape (m, n). Keyword Args: tails (int): 1 for 1-tail, 2 for 2-tail test. axis (int): 0: test trend in each column. 1: test trend in each row. Returns: z (ndarray): If <axis> = 0, 1d array with length <n>, standard scores corresponding to data in each row in <x>. If <axis> = 1, 1d array with length <m>, standard scores corresponding to data in each column in <x>. p (ndarray): p-values corresponding to <z>. ''' if np.ndim(data) != 2: raise Exception("<data> should be 2D.") # alway put records in rows and do M-K test on each row if axis == 0: data = data.T m, n = data.shape mask = np.triu_indices(n, 1) try: s = np.sign(data[:,mask[1]] - data[:,mask[0]]).sum(axis=1) except MemoryError: # data size too big, do it in batches i = 1 s = np.zeros(m) while True: k = m//(n*i) t = m//k + 1 try: for j in range(t): id1 = j*k id2 = min((j+1)*k, m) sj = np.sign(data[id1:id2,mask[1]] - data[id1:id2,mask[0]]).sum(axis=1) s[id1:id2] = sj except: i +=1 else: break #--------------------Count ties-------------------- counts = countTies(data) tt = counts * (counts - 1) * (2*counts + 5) tt = tt.sum(axis=1) #-----------------Sample Gaussian----------------- var = (n * (n-1) * (2*n+5) - tt) / 18. eps = 1e-8 # avoid dividing 0 z = (s - np.sign(s)) / (np.sqrt(var) + eps) p = norm.cdf(z) p = np.where(p>0.5, 1-p, p) if tails==2: p=p*2 return z, p

The new part is:

mask = np.triu_indices(n, 1) try: s = np.sign(data[:,mask[1]] - data[:,mask[0]]).sum(axis=1) except MemoryError: # data size too big, do it in batches i = 1 s = np.zeros(m) while True: k = m//(n*i) t = m//k + 1 try: for j in range(t): id1 = j*k id2 = min((j+1)*k, m) sj = np.sign(data[id1:id2,mask[1]] - data[id1:id2,mask[0]]).sum(axis=1) s[id1:id2] = sj except: i +=1 else: break

So I’m trying in a `try`

block the half-memory solution in the 2nd
implementation to see if there is enough RAM to handle it. If not, I
divide the entire task into a number of chunks, each time using the
half-memory upper triangle trick on the chunk only, and put together the
final array `s`

piece-by-piece. This has been proven to work for
big sized data that were giving me RAM overflow using the fully
vectorized version.

# Summary

In this post, I shared a vectorized Mann-Kendall trend test using
`numpy`

and `scipy`

. It is helpful to carry out multiple trend tests
on a gridded dataset, and can be considerably faster than using a
nested for loop. However, it achieves this speed-up by sacrificing
memory efficiency, and may trigger a RAM overflow on large data size.