# A vectorized Mann-Kendall trend test implementation using numpy + scipy

In this post we are going to work out a vectorized Mann-Kendall trend test implementation using numpy and scipy.

# The goal

Mann-Kendall (MK) test is a non-parametric statistical test to validate the existence of monotonic upward/downward trend in a time series. The test is non-parametric, meaning that it does not assume the underlying distribution of the data being examined.

In the context of climatic researches, it can be used to examine the significance of, for example, the rising average temperature over a region during a certain period.

There are available Python implementations of the MK test, for instance, the pyMannKendall module, which is a much more versatile module than what we are going to build in this post.

However, when dealing with climatic/meteorological data, it is often desirable to perform not just one test on a single time series, but multiple tests on all of the grid cells within a domain. With increasing horizontal resolutions of datasets, this can be fairly slow if done using a nested for loop strategy in Python.

Therefore, in this post we are going to explore a vectorized Mann-Kendall trend test implementation that is designed to speed up such kind of computations.

# The Mann-Kendall trend test

The computations in a MK test are:

1. Denote a 1-dimensional time series as $$x_1, x_2, \cdots, x_n$$.
2. Determine the signs of all possible pairs of $$x_j – x_k$$:

$s_{jk} = sgn(x_j – x_k) = \left\{\begin{matrix} 1 & if\; x_j > x_k \\ 0 & if\; x_j = x_k \\ -1 & if\; x_j < x_k \\ \end{matrix}\right.$

where $$j > k$$, i.e. a later measurement minus an earlier one. It’s easy to work out that there are $$\frac{n(n-1)}{2}$$ such pairs.

3. Sum up all the $$s_{jk}$$:

$S = \sum_{k-1}^{n-1} \sum_{j-k+1}^{n} sgn(x_j – x_k)$

4. Compute the variance of $$S$$:

$VAR(S) = \frac{1}{18} [n(n-1)(2n + 5) – T]$

where $$T$$ is an adjustment factor for ties in the time series, and is computed as:

$T = \sum_{p-1}^g t_p (t_p – 1)(2t_p +5)$

where:

$$g$$: the number of tied groups,
$$t_p$$: the number of values in the group $$p$$.

E.g. the sequence $$1, 2, 2, 9.3, -1, 9.3, 5, 7, 2$$ has 3 tied groups:

(1) 2: appeared 3 times, giving $$t_1 = 3$$.
(2) 9.3: appeared twice, giving $$t_2 = 2$$.

Then the tie argument would be:

$T = \sum_{p-1}^g t_p (t_p – 1)(2t_p +5) = 3 \times (3 – 1) \times (2 \times 3 + 5) + 2 \times (2 – 1) \times (2 \times 2 + 5) = 84$

5. Compute the MK test statistic:

$Z = \left\{\begin{matrix} \frac{S-1}{\sqrt{VAR(S)}} & if \; S > 0 \\ 0 & if \, S = 0 \\ \frac{S+1}{\sqrt{VAR(S)}} & if \; S < 0 \end{matrix}\right.$

6. Do the hypothesis test using $$Z$$ as the standard score on a standard Normal distribution.

# Serial version

The serial version is relatively straightforward:

import numpy as np
from scipy.stats import norm

def countTies(data):
'''Count the number of ties in each group for all groups
'''
counts = np.unique(data, return_counts=True)[1]
counts = counts[np.where(counts>1)]
if len(counts) == 0:
return [0,]
else:
return counts

def MannKendallTrend(data, tails=2, verbose=True):
'''Perform Mann-Kendall Trend test on a timeseries

Args:
data (ndarray): 1d array, time series to examine trend.
Keyword Args:
tails (int): 1 or 2, 1- or 2- tailed test.
Returns:
p (float): p-value of the test statistic.
'''

n = data.size
s = np.sign(data[None,:] - data[:,None])
s = np.triu(s).sum()

# Count ties
counts = countTies(data)
tt = np.array(list(map(lambda x:x*(x-1)*(2*x+5), counts)))
tt = np.sum(tt)

var = (n*(n-1)*(2*n+5)-tt)/18.
z = (s - np.sign(s)) / np.sqrt(var)
p = norm.cdf(z)
p = 1-p if p>0.5 else p

if tails == 2:
p = p*2

if verbose:
print('\n# <testwmw>: s',s)
print('# <testwmw>: counts',counts)
print('# <testwmw>: tt',tt)
print('# <testwmw>: var',var)
print('# <testwmw>: z',z)
print('# <testwmw>: p',p)

return p

The function MannKendallTrend() first computes the signs of all possible pairs using

s = np.sign(data[None,:] - data[:,None])


Note that this is already using some vectorization trick rather than a nested for loop. Then the signs are summed using:

s = np.triu(s).sum()


This is taking the upper triangle of the s matrix, and we are not excluding cases of j = k on the diagonal line, since their contributions would be 0 anyways.

Then the tied values are found using the countTies() function, and the tie adjustment factor computed.

Lastly, the percentile of a standard Normal distribution is obtained using the scipy.stats.norm.cdf() function. The p-value is doubled if a 2-tailed test is desired.

# Vectorized version

## 1st implementation

The vectorized version will follow generally the same procedures as the serial version. The trickiest part lies in the computation of the tie adjustment factor.

To simply things, we are assuming the input data to be a 2D array, with shape (m, n), where m is the number of records, and n the dimension size of each record.

For a multi-cell trend analysis using climatic data, n would be the length of the time series in each grid cell, and m the total number of grid cells in the data. Therefore, if the data come in as a 3D cube with the format of (time, latitude, longitude), one would need to reshape it into (latitude * longitude, time).

The 1st vectorized version I wrote is the following:

def MannKendallTrend2D(data, tails=2, axis=0, verbose=True):
'''Vectorized Mann-Kendall tests on 2D matrix rows/columns

Args:
data (ndarray): 2d array with shape (m, n).
Keyword Args:
tails (int): 1 for 1-tail, 2 for 2-tail test.
axis (int): 0: test trend in each column. 1: test trend in each
row.
Returns:
z (ndarray): If <axis> = 0, 1d array with length <n>, standard scores
corresponding to data in each row in <x>.
If <axis> = 1, 1d array with length <m>, standard scores
corresponding to data in each column in <x>.
p (ndarray): p-values corresponding to <z>.
'''

if np.ndim(data) != 2:
raise Exception("<data> should be 2D.")

# alway put records in rows and do M-K test on each row
if axis == 0:
data = data.T

m, n = data.shape
s = np.sign(data[:,None,:]-data[:,:,None]).astype('int')

#--------------------Count ties--------------------
counts = countTies(data)
tt = counts * (counts - 1) * (2*counts + 5)
tt = tt.sum(axis=1)

#-----------------Sample Gaussian-----------------
var = (n * (n-1) * (2*n+5) - tt) / 18.
eps = 1e-8  # avoid dividing 0
z = (s - np.sign(s)) / (np.sqrt(var) + eps)
p = norm.cdf(z)
p = np.where(p>0.5, 1-p, p)

if tails==2:
p=p*2

return z, p

Note that:

Although an axis argument is provided to allow specifying the axis of the time dimension, I’m forcing the data to be in the (m, n) layout as mentioned above, by doing a transpose if axis=0: data = data.T.

The signs of all possible pairs are computed using:

s = np.sign(data[:,None,:]-data[:,:,None]).astype('int')


This is similar to the serial version except that it handles all grid cells simultaneously.

Again, we take only the upper triangle and sum it up:

mask = np.triu(np.ones([n, n])).astype('int')
s = np.sign(data[:,None,:]-data[:,:,None]).astype('int')
s = (s * mask).sum(axis=(1,2))

The tied groups are obtained using a new countTies() function, given below:

def countTies(x):
'''Count number of ties in rows of a 2D matrix

Args:
x (ndarray): 2d matrix.
Returns:
result (ndarray): 2d matrix with same shape as <x>. In each
row, the number of ties are inserted at (not really) arbitary
locations.
The locations of tie numbers in are not important, since
they will be subsequently put into a formula of sum(t*(t-1)*(2t+5)).

Inspired by: https://stackoverflow.com/a/24892274/2005415.
'''
if np.ndim(x) != 2:
raise Exception("<x> should be 2D.")

m, n = x.shape

x = copy.deepcopy(x)
x.sort(axis=1)
diff = np.diff(x, axis=1)

absdiff = np.abs(np.diff(cated, axis=1))

rows, cols = np.where(absdiff==1)
rows = rows.reshape(-1, 2)[:, 0]
cols = cols.reshape(-1, 2)
counts = np.diff(cols, axis=1)+1
result = np.zeros(x.shape).astype('int')
result[rows, cols[:,1]] = counts.flatten()

return result

This part is a bit tricky to vectorize, because the number of ties would be in general different for each record.

For instance, the 1st record (now has been forced to be the 1st row in data), contains 3 groups of ties:

$t_1 = 2; \; t_2 = 2; \; t_3 = 3$

If we put that in an array, it would be [2, 2, 3], or [2, 3, 2], since ordering doesn’t really matter.

Suppose the 2nd record (2nd row in data) has 1 group of ties with 2 values, then the output array for that would be [2, ].

The 3rd record may have no ties, giving [].

Therefore, the vectorized tie-counts array would have different number of values in its rows.

The solution would be to pad 0s to make it a squared shape, and the size to pad to can be just the length of the time series n, since it is guaranteed that the number of tie groups in a n-length time series is always smaller than n.

Zero-padding is acceptable, because it is observed that these padded 0s make no contribution to the final result, since the formula we use to compute tie adjustment is

tt = counts * (counts - 1) * (2*counts + 5)


Additionally, the positions of tie counts do not matter, since they are latter summed up in tt = tt.sum(axis=1).

So, the basic idea of countTies() is:

• make a deep copy of the input data, because we are going to sort the data in order along the time dimension (x.sort(axis=1)).
• after sorting, compute the differences between consecutive pairs.
• 0 in the differentiated consecutive pairs denotes a tie, and multiple consecutive 0s denote a tie group with more than 2 tied values.
• replace such 0s with 1s, and all other places with 0s.
• find the boundaries of such 1s, and they encode the indices of the tied values.
• the number of ties in each group can be worked out using these indices, and these tie counts are put to the locations of the last tied values.
• these are all done in a vectorized manner.

In the end, the resultant result variable is a (m, n) array. In each row, the number of ties in different tie groups in that row are inserted at different places (recall that positions are insignificant), and all other places are filled with 0s (recall that 0s do not contribute).

With the tie adjustment factor worked out, the rest of the MannKendallTrend2D() function is fairly self-explanatory.

This version works pretty well already. I tested it on a sample case with the input data being (50, 145, 192) in size (in (time, latitude, longitude) layout). On my desktop it took about 1.28 seconds. This solution was also offered as Stackoverflow answer.

## 2nd implementation

One big issue with the above implementation is memory usage. With vectorization speed gain we are sacrificing memory efficiency. For larger sized data, I got overflow warning and numpy simply refused to compute.

The biggest RAM eating block lies here:

mask = np.triu(np.ones([n, n])).astype('int')
s = np.sign(data[:,None,:]-data[:,:,None]).astype('int')

namely, the vectorized computation of all the pair-wise differences.

It is noticed that we don’t really need the complete matrix of data[:,None,:] - data[:,:,None], since only the upper triangle is needed, and the lower triangle gives only the opposite values.

Therefore I came up with this 2nd version (the countTies() function stays the same):

def MannKendallTrend2D(data, tails=2, axis=0, verbose=True):
'''Vectorized Mann-Kendall tests on 2D matrix rows/columns

Args:
data (ndarray): ndarray with shape (m, n).
Keyword Args:
tails (int): 1 for 1-tail, 2 for 2-tail test.
axis (int): 0: test trend in each column. 1: test trend in each
row.
Returns:
z (ndarray): If <axis> = 0, 1d array with length <n>, standard scores
corresponding to data in each row in <x>.
If <axis> = 1, 1d array with length <m>, standard scores
corresponding to data in each column in <x>.
p (ndarray): p-values corresponding to <z>.
'''

if np.ndim(data) != 2:
raise Exception("<data> should be 2D.")

# alway put records in rows and do M-K test on each row
if axis == 0:
data = data.T

m, n = data.shape

#--------------------Count ties--------------------
counts = countTies(data)

tt = counts * (counts - 1) * (2*counts + 5)
tt = tt.sum(axis=1)

#-----------------Sample Gaussian-----------------
var = (n * (n-1) * (2*n+5) - tt) / 18.
eps = 1e-8  # avoid dividing 0
z = (s - np.sign(s)) / (np.sqrt(var) + eps)
p = norm.cdf(z)

p = np.where(p>0.5, 1-p, p)

if tails==2:
p=p*2

return z, p

To highlight the differences, these are the new lines computing the pair-wise differences and signs:

mask = np.triu_indices(n, 1)
s = np.sign(data[:,mask[1]] - data[:,mask[0]]).sum(axis=1)

The np.triu_indices() gives indices of the upper triangle, so we can save half of original memory.

## 3rd implementation

Unfortunately, it is still quite heavy on RAM usage.

My 3rd attempt is no longer strictly vectorized, but a compromise between fully vectorized computation and an iterated one. I could not figure out an efficient solution using tools in numpy or scipy for this, and I think it might be a good place to look for other directions like cython or compiled code.

The 3rd MannKendallTrend2D() is:

def MannKendallTrend2D(data, tails=2, axis=0, verbose=True):
'''Vectorized Mann-Kendall tests on 2D matrix rows/columns

Args:
data (ndarray): ndarray with shape (m, n).
Keyword Args:
tails (int): 1 for 1-tail, 2 for 2-tail test.
axis (int): 0: test trend in each column. 1: test trend in each
row.
Returns:
z (ndarray): If <axis> = 0, 1d array with length <n>, standard scores
corresponding to data in each row in <x>.
If <axis> = 1, 1d array with length <m>, standard scores
corresponding to data in each column in <x>.
p (ndarray): p-values corresponding to <z>.
'''

if np.ndim(data) != 2:
raise Exception("<data> should be 2D.")

# alway put records in rows and do M-K test on each row
if axis == 0:
data = data.T

m, n = data.shape
try:
except MemoryError:
# data size too big, do it in batches
i = 1
s = np.zeros(m)
while True:
k = m//(n*i)
t = m//k + 1
try:
for j in range(t):
id1 = j*k
id2 = min((j+1)*k, m)
s[id1:id2] = sj
except:
i +=1
else:
break

#--------------------Count ties--------------------
counts = countTies(data)

tt = counts * (counts - 1) * (2*counts + 5)
tt = tt.sum(axis=1)

#-----------------Sample Gaussian-----------------
var = (n * (n-1) * (2*n+5) - tt) / 18.
eps = 1e-8  # avoid dividing 0
z = (s - np.sign(s)) / (np.sqrt(var) + eps)
p = norm.cdf(z)

p = np.where(p>0.5, 1-p, p)

if tails==2:
p=p*2

return z, p

The new part is:

mask = np.triu_indices(n, 1)
try:
except MemoryError:
# data size too big, do it in batches
i = 1
s = np.zeros(m)
while True:
k = m//(n*i)
t = m//k + 1
try:
for j in range(t):
id1 = j*k
id2 = min((j+1)*k, m)
break
So I’m trying in a try block the half-memory solution in the 2nd implementation to see if there is enough RAM to handle it. If not, I divide the entire task into a number of chunks, each time using the half-memory upper triangle trick on the chunk only, and put together the final array s piece-by-piece. This has been proven to work for big sized data that were giving me RAM overflow using the fully vectorized version.
In this post, I shared a vectorized Mann-Kendall trend test using numpy and scipy. It is helpful to carry out multiple trend tests on a gridded dataset, and can be considerably faster than using a nested for loop. However, it achieves this speed-up by sacrificing memory efficiency, and may trigger a RAM overflow on large data size.