The goal
This post covers the implementation of pooling layers in a
convolutional neural network using numpy
. The complete code can be found in this Github repo file and file.
The pooling operation
Like convolution, the pooling operation also involves an input image (or input data cube), and a pooling kernel (or filter).
There are typically 2 types of pooling layers in a convolutional neural network:
- max-pooling: the maximum value in each pooling window is taken out as the pooling result. This is like grey-dilation in the image process field, or a maximum filtering. (see Figure 1 below for an illustration)
- average-pooling: the average value in each pooling window is taken out as the pooling result. This is like a moving average operation.
Like convolution, pooling is also parameterized by a stride. Suppose the input data have dimensions of \((h_i \times w_i \times c_i)\), where:
- \(h_i\), \(w_i\) being height and width,
- \(c_i\) being the depth.
The pooling kernel has a size of \((f \times f)\). Then with a stride of \(s\), the pooling result will have a size of \((h_o \times w_o \times c_o)\), where:
- \(h_o = [(h_i – f)/s] + 1\), the output height.
- \(w_o = [(w_i – f)/s] + 1\), the output width.
- \(c_o = c_i\), the output depth.
- \([ \, ]\) is the floor function
Note that padding is rarely used in pooling, therefore pooling results in a smaller sized output, and this is the point of doing pooling: to shrink the size of convolutional layers.
Also note that pooling does not alter the depth dimension. The max- or average- pooling happens independently in each channel, and the same number of channels are returned.
Numpy implementation of max- and average- pooling
Serial version
"Serial" here means that the pooling function handles 1 sample at a time. The input data are assumed to be either:
- 2D: with shape
(h, w)
. - 3D: with shape
(h, w, c)
.
where h
, w
are the image height/width, and c
being the
depth/channel dimension.
def poolingOverlap(mat, f, stride=None, method='max', pad=False, return_max_pos=False): '''Overlapping pooling on 2D or 3D data. Args: mat (ndarray): input array to do pooling on the first 2 dimensions. f (int): pooling kernel size. Keyword Args: stride (int or None): stride in row/column. If None, same as <f>, i.e. non-overlapping pooling. method (str): 'max for max-pooling, 'mean' for average-pooling. pad (bool): pad <mat> or not. If true, pad <mat> at the end in y-axis with (f-n%f) number of nans, if not evenly divisible, similar for the x-axis. return_max_pos (bool): whether to return an array recording the locations of the maxima if <method>=='max'. This could be used to back-propagate the errors in a network. Returns: result (ndarray): pooled array. See also unpooling(). ''' m, n = mat.shape[:2] if stride is None: stride = f _ceil = lambda x, y: x//y + 1 if pad: ny = _ceil(m, stride) nx = _ceil(n, stride) size = ((ny-1)*stride+f, (nx-1)*stride+f) + mat.shape[2:] mat_pad = np.full(size, 0) mat_pad[:m, :n, ...] = mat else: mat_pad = mat[:(m-f)//stride*stride+f, :(n-f)//stride*stride+f, ...] view = asStride(mat_pad, (f, f), stride) if method == 'max': result = np.nanmax(view, axis=(2, 3), keepdims=return_max_pos) else: result = np.nanmean(view, axis=(2, 3), keepdims=return_max_pos) if return_max_pos: pos = np.where(result == view, 1, 0) result = np.squeeze(result) return result, pos else: return result
Note that I’ve added the padding functionality just for good measure.
The function deals with either max- or average- pooling, specified by
the method
keyword argument.
Also note that internally, it calls a asStride()
function, which
was introduced in a previous post talking about 2d and 3d
convolutions. Without going into further details, the asStride()
trick takes out all the pooling windows (which in general will contain
overlapping grid cell locations, see the 9 red sub-matrices in Figure 1) in on go, allowing the max- or average- pooling to be carried out
in a single np.nanmax()
or np.nanmean()
function call.
The asStride()
function is given below:
def asStride(arr, sub_shape, stride): '''Get a strided sub-matrices view of an ndarray. Args: arr (ndarray): input array of rank 2 or 3, with shape (m1, n1) or (m1, n1, c). sub_shape (tuple): window size: (m2, n2). stride (int): stride of windows in both y- and x- dimensions. Returns: subs (view): strided window view. See also skimage.util.shape.view_as_windows() ''' s0, s1 = arr.strides[:2] m1, n1 = arr.shape[:2] m2, n2 = sub_shape[:2] view_shape = (1+(m1-m2)//stride, 1+(n1-n2)//stride, m2, n2)+arr.shape[2:] strides = (stride*s0, stride*s1, s0, s1)+arr.strides[2:] subs = np.lib.stride_tricks.as_strided( arr, view_shape, strides=strides, writeable=False) return subs
Lastly, the return_max_pos
keyword argument controls whether to
return the locations of the pooled maxima values in each pooling
window. This will be useful when training a convolutional neural
network during the backpropagation process.
An example
A toy example is given below illustrating the max-pooling process:
n = 6 f = 3 s = 3 np.random.seed(100) mat = np.random.randint(0, 20, [n, n]) print('mat.shape:', mat.shape, 'filter size:', f, 'stride:', s) print(mat) po, pos = poolingOverlap(mat, f, stride=s, pad=False, return_max_pos=True) print('Overlap pooling, no pad. Result shape:', po.shape) print(po) print('Max position pos.shape:') print(pos.shape) print('Max position in the (0,0) window:') print(pos[0, 0]) print('Max position in the (0,1) window:') print(pos[0, 1])
In this example, the data have shape (6, 6)
, filled with random
integers between 0 and 19.
The max pooling kernel is (3, 3)
, with a stride of 3
(non-overlapping). Therefore the output has a height/width of
[(6 - 3) / 3] + 1 = 2
.
Meanwhile, the locations of the maxima values in each pooling window
are also recorded, and the returned value pos
has a shape of (2, 2, 3, 3)
. The first 2 dimensions correspond to the row/column of the
pooled-out result, and the last 2 dimensions correspond to the pooling
window operating on that particularly location.
Output:
mat.shape: (6, 6) filter size: 3 stride: 3 [[ 8 3 7 15 16 10] [ 2 2 2 14 2 17] [16 15 4 11 16 9] [ 2 12 4 1 13 19] [ 4 4 3 7 17 15] [ 1 14 7 16 2 9]] Overlap pooling, no pad. Result shape: (2, 2) [[16 17] [14 19]] Max position pos.shape: (2, 2, 3, 3) Max position in the (0,0) window: [[0 0 0] [0 0 0] [1 0 0]] Max position in the (0,1) window: [[0 0 0] [0 0 1] [0 0 0]]
Vectorized version
The "vectorized" version is similar as the "serial" version, except that it handles multiple samples at a time, and the input data are assumed to be in the shape of:
(m, h, w, c)
where h/w/c
being the image height/width/depth, as before. m
denotes the number of samples.
def poolingOverlap(mat, f, stride=None, method='max', pad=False, return_max_pos=False): '''Overlapping pooling on 4D data. Args: mat (ndarray): input array to do pooling on the mid 2 dimensions. Shape of the array is (m, hi, wi, ci). Where m: number of records. hi, wi: height and width of input image. ci: channels of input image. f (int): pooling kernel size in row/column. Keyword Args: stride (int or None): stride in row/column. If None, same as <f>, i.e. non-overlapping pooling. method (str): 'max for max-pooling, 'mean' for average-pooling. pad (bool): pad <mat> or not. If true, pad <mat> at the end in y-axis with (f-n%f) number of nans, if not evenly divisible, similar for the x-axis. return_max_pos (bool): whether to return an array recording the locations of the maxima if <method>=='max'. This could be used to back-propagate the errors in a network. Returns: result (ndarray): pooled array. See also unpooling(). ''' m, hi, wi, ci = mat.shape if stride is None: stride = f _ceil = lambda x, y: x//y + 1 if pad: ny = _ceil(hi, stride) nx = _ceil(wi, stride) size = (m, (ny-1)*stride+f, (nx-1)*stride+f, ci) mat_pad = np.full(size, 0) mat_pad[:, :hi, :wi, ...] = mat else: mat_pad = mat[:, :(hi-f)//stride*stride+f, :(wi-f)//stride*stride+f, ...] view = asStride(mat_pad, (f, f), stride) if method == 'max': result = np.nanmax(view, axis=(3, 4), keepdims=return_max_pos) else: result = np.nanmean(view, axis=(3, 4), keepdims=return_max_pos) if return_max_pos: pos = np.where(result == view, 1, 0) result = np.squeeze(result, axis=(3,4)) return result, pos else: return result
Similarly, the asStride()
function is also a bit different from the
"serial" version, but the general idea is the same. The function is
given below:
def asStride(arr, sub_shape, stride): '''Get a strided sub-matrices view of an ndarray. Args: arr (ndarray): input array of rank 4, with shape (m, hi, wi, ci). sub_shape (tuple): window size: (f1, f2). stride (int): stride of windows in both 2nd and 3rd dimensions. Returns: subs (view): strided window view. This is used to facilitate a vectorized 3d convolution. The input array <arr> has shape (m, hi, wi, ci), and is transformed to a strided view with shape (m, ho, wo, f, f, ci). where: m: number of records. hi, wi: height and width of input image. ci: channels of input image. f: kernel size. The convolution kernel has shape (f, f, ci, co). Then the vectorized 3d convolution can be achieved using either an einsum() or a tensordot(): conv = np.einsum('myxfgc,fgcz->myxz', arr_view, kernel) conv = np.tensordot(arr_view, kernel, axes=([3, 4, 5], [0, 1, 2])) See also skimage.util.shape.view_as_windows() ''' sm, sh, sw, sc = arr.strides m, hi, wi, ci = arr.shape f1, f2 = sub_shape view_shape = (m, 1+(hi-f1)//stride, 1+(wi-f2)//stride, f1, f2, ci) strides = (sm, stride*sh, stride*sw, sh, sw, sc) subs = np.lib.stride_tricks.as_strided( arr, view_shape, strides=strides, writeable=False) return subs
The unpooling operation
Unpooling is like the inverse of pooling, and is used during the backpropagation stage of the training process of a convolutional neural network.
For max-pooling, unpooling puts the input error values back to the original locations where the maxima values have been pooled out, and fills the rest of the places with 0s. This is why we need to keep a record of the maximum value locations during the pooling stage. See the schematic below for an illustration.
For average-pooling, unpooling divides the input error values evenly across the pooling window, and puts them back to their original places.
Numpy implementation of max-unpooling
We would attack the unpooling problem separately for max- and average- pooling. This section introduces the "serial" and "vectorized" implementations of max-unpooling.
For max-pooling, aside from the input array (mat
) to unpool, it also
requires:
- the array recording the maxima value locations:
pos
, - the shape of the array to unpool to:
ori_shape
, - the stride used during the pooling stage:
stride
.
Serial version
Corresponding to the serial version of the pooling function, serial
version of max-unpooling assumes that the input (mat
) is either 2D or 3D:
For 2D input: mat
is the max-pooled 2D data. The pos
argument has a shape of (iy, ix, cy, cx)
where iy, ix
give the
row/column sizes of the max-pooled array,
cy, cx
is the same as the max-pooling kernel/filter.
For instance, using the example given above, the top-left-most number
in the max-pooled array is 16
:
[[16 17]
[14 19]]
therefore, iy = 0
and ix = 0
for the top-left-most number 16
.
This number was taken from this 3 x 3
sub-array from the original
array:
[[ 8 3 7 ]
[ 2 2 2 ]
[16 15 4 ]]
In this particularly case, the maximum value 16
occurs at location
(2, 0)
inside the pooling window, therefore the sub-array of pos(0,0)
has a value of 1
at (2, 0)
correspondingly:
[[0 0 0]
[0 0 0]
[1 0 0]]
After unpooling, the input error value (denoted as x
) will be put to location of the 1
, in the top-left most 3 x 3 sub-array:
[[ 0 0 0 0 0 0]
[ 0 0 0 0 0 0]
[ x 0 0 0 0 0]
[ 0 0 0 0 0 0]
[ 0 0 0 0 0 0]
[ 0 0 0 0 0 0]]
After putting backing all input error values, the unpooled array will
have error values located at these locations, denoted as x
:
[[ 0 0 0 0 0 0]
[ 0 0 0 0 0 x]
[ x 0 0 0 0 0]
[ 0 0 0 0 0 x]
[ 0 0 0 0 0 0]
[ 0 x 0 0 0 0]]
For 3D input of mat
: mat
is the max-pooled 3D data. The pos
has shape (iy, ix, cy, cx, cc)
, the extra cc
dimension is the
depth dimension.
The function is given below:
def unpooling(mat, pos, ori_shape, stride): '''Undo a max-pooling of a 2d or 3d array to a larger size Args: mat (ndarray): 2d or 3d array to unpool on the first 2 dimensions. pos (ndarray): array recording the locations of maxima in the original array. If <mat> is 2d, <pos> is 4d with shape (iy, ix, cy, cx). Where iy/ix are the numbers of rows/columns in <mat>, cy/cx are the sizes of the each pooling window. If <mat> is 3d, <pos> is 5d with shape (iy, ix, cy, cx, cc). Where cc is the number of channels in <mat>. ori_shape (tuple): original shape to unpool to. stride (int): stride used during the pooling process. Returns: result (ndarray): <mat> unpoolled to shape <ori_shape>. ''' assert np.ndim(pos) in [4, 5], '<pos> should be rank 4 or 5.' result = np.zeros(ori_shape) if np.ndim(pos) == 5: iy, ix, cy, cx, cc = np.where(pos == 1) iy2 = iy*stride ix2 = ix*stride iy2 = iy2+cy ix2 = ix2+cx values = mat[iy, ix, cc].flatten() result[iy2, ix2, cc] = values else: iy, ix, cy, cx = np.where(pos == 1) iy2 = iy*stride ix2 = ix*stride iy2 = iy2+cy ix2 = ix2+cx values = mat[iy, ix].flatten() result[iy2, ix2] = values return result
An example
Following up the above example, let’s unpool the max-pooled array of
[[16 17]
[14 19]]
unpo = unpooling(po, pos, mat.shape, s) print('Unpooling result shape:', unpo.shape) print(unpo)
The output:
Unpooling result shape: (6, 6) [[ 0. 0. 0. 0. 0. 0.] [ 0. 0. 0. 0. 0. 17.] [16. 0. 0. 0. 0. 0.] [ 0. 0. 0. 0. 0. 19.] [ 0. 0. 0. 0. 0. 0.] [ 0. 14. 0. 0. 0. 0.]]
It can be seen that the unpooling process reverts back to the original
shape of (6, 6
), and the maxima values are put back to their
original locations.
Vectorized version
The "vectorized" version looks similar as the "serial" version, except
that we are assuming the input array mat
to be 4D: (m, h, w, c)
,
where:
m
sample size,h, w
image height/width,c
image depth.
It is also assumed that mat
has been
max-pooled from a 4D data array (see the "vectorized" poolingOverlap()
function above). Correspondingly, the maxima value positions recorded
in pos
is 6D: (m, iy, ix, cy, cx, cc)
, where:
m
: sample size, as above,iy, ix
: size of the max-pooled array,cy, cx
: max-pooling kernel size,cc
: depth dimension size.
The function definition is given below:
def unpooling(mat, pos, ori_shape, stride): '''Undo a max-pooling of a 4d array to a larger size Args: mat (ndarray): 4d array to unpool on the mid 2 dimensions. pos (ndarray): array recording the locations of maxima in the original array. If <mat> is 4d, <pos> is 6d with shape (m, h, w, y, x, c). Where h/w are the numbers of rows/columns in <mat>, y/x are the sizes of the each pooling window. c is the number of channels in <mat>. ori_shape (tuple): original shape to unpool to: (m, h2, w2, c). stride (int): stride used during the pooling process. Returns: result (ndarray): <mat> unpoolled to shape <ori_shape>. ''' assert np.ndim(pos)==6, '<pos> should be rank 6.' result = np.zeros(ori_shape) im, ih, iw, iy, ix, ic = np.where(pos == 1) ih2 = ih*stride iw2 = iw*stride ih2 = ih2+iy iw2 = iw2+ix values = mat[im, ih, iw, ic].flatten() result[im, ih2, iw2, ic] = values return result
Numpy implementation of average-unpooling
The unpooling of average-pooled array works quite differently as the unpooling of max-pooling, because we do not need to put the error values to some specific locations, but only need to send the evenly divided errors back to their original pooling window locations. See Figure 3 below for an illustration.
It works by firstly working out the evenly divided error, and duplicating that mean error a number of times. Both can be achieved by knowing the pooling kernel size. Then rest of the game is just some housing keeping to make sure the dimensions are correct.
Serial version
Again, "serial" version handles one input sample at a time, and the
mat
array is assumed to be 2D or 3D ((h, w)
or (h, w, c
).
The function is given below:
def unpoolingAvg(mat, f, ori_shape): '''Undo an average-pooling of a 2d or 3d array to a larger size Args: mat (ndarray): 2d or 3d array to unpool on the first 2 dimensions. f (int): pooling kernel size. ori_shape (tuple): original shape to unpool to. Returns: result (ndarray): <mat> unpoolled to shape <ori_shape>. ''' m, n = ori_shape[:2] ny = m//f nx = n//f tmp = np.reshape(mat, (ny, 1, nx, 1)+mat.shape[2:]) tmp = np.repeat(tmp, f, axis=1) tmp = np.repeat(tmp, f, axis=3) tmp = np.reshape(tmp, (ny*f, nx*f)+mat.shape[2:]) result=np.zeros(ori_shape) result[:tmp.shape[0], :tmp.shape[1], ...]= tmp return result
Vectorized version
Again, "vectorized" version handles m
samples at a time. The input
mat
is assumed to be 4D: (m, h, w, c
).
Function given below:
def unpoolingAvg(mat, f, ori_shape): '''Undo an average-pooling of a 4d array to a larger size Args: mat (ndarray): input array to do unpooling on the mid 2 dimensions. Shape of the array is (m, hi, wi, ci). Where m: number of records. hi, wi: height and width of input image. ci: channels of input image. f (int): pooling kernel size in row/column. ori_shape (tuple): original shape to unpool to: (m, h2, w2, c). Returns: result (ndarray): <mat> unpoolled to shape <ori_shape>. ''' m, hi, wi, ci = ori_shape ny = hi//f nx = wi//f tmp = np.reshape(mat, (m, ny, 1, nx, 1, ci)) tmp = np.repeat(tmp, f, axis=2) tmp = np.repeat(tmp, f, axis=4) tmp = np.reshape(tmp, (m, ny*f, nx*f, ci)) result=np.zeros(ori_shape) result[:, :tmp.shape[1], :tmp.shape[2], :] = tmp return result
Summary
Max- or average- pooling works on individual image channels independently, and both shrinks the data size in height/width dimensions.
Unpooling does the inverse and "broadcasts" a smaller sized image to a larger array.
In the max-unpooling process, the recorded position information is used to send back the errors to the locations of the maxima values during the max-pooling process. The rest of the array space is filled with 0s.
In the average-unpooling process, the input errors are evenly divided into a pooling window, and sent back to the original locations.
For max- and average- pooling and unpooling, we developed a "serial"
version and a "vectorized" version using numpy
. The "vectorized"
version has the advantage of being able to handle multiple samples at
a time, and can considerably speed up the training process of a
convolutional neural network.