Cache NetCDF data to disk

In this post we create a *DiskCacher* class that can be used as a *with* context manager, a function wrapper and as the pie-syntax decorator. It modifies a given computation function, and tries to use the cached data if possible, and only do the re-computation if the cache is not found or the user forces an overwrite.

In a previous post we talked about in-memory caching using dictionary in Python. Sometimes the data may be too big in size to fit into the RAM, and the time required to re-compute them is significantly longer than the time needed to read in from disk. In such situations, one may choose to cache the data to disk, and load in the data to bypass the re-computation. In this post, I’ll share my implementation of some classes/functions that cache some NetCDF data to disk. The complete script file can be obtained from here.

Rationale

In my research work, I often have to write Python scripts that perform some analyses made up of multiple steps, say

  • Step 1: read in data.
  • Step 2: compute to get intermediate results.
  • Step 3: create plots using results from Step 2.

And the same script has to be executed over and over again as I make adjustments to the code to fix problems, tweak some parameters in the algorithms or modify the plot layout etc.. For such a kind of workflow, it would be beneficial if I could cache the intermediate results in Step 2 somewhere, so that when I adjust the code in Step 3, I can read the results from the cache rather than re-computing them repeatedly.

A native and primitive solution would be to add a new code block between Step 2 and Step 3, to save the results from Step 2; comment out the relevant computation code for Step 2; and replace that with a block to read in the data from disk. The new script structure would be like:

  • Step 1: read in data.
  • Commented out Step 2: compute to get intermediate results.
  • New Step 2a: Read in intermediate results from disk.
  • New Step 2b: Save intermediate results to disk.
  • Step 3: create plots using results from Step 2.

Apparently the New Step 2a and New Step 2b cannot co-exist. I have to use the New Step 2b first to save to disk, then comment it out, and uncomments New Step 2a for subsequent executions of the script. If I need to change the computation of Step 2, I will have to revive the old Step 2 and the New Step 2b, comment out New Step 2a, and run the script to save the updated intermediate results. Then comment/uncomment the code blocks to use the disk cache again. Needless to say, this gets confusing and tedious quick, both are prone to mistakes.

Another solution would be to break the script into 2 separate ones: in script_part1, do Step 1 and Step 2, then save the intermediate results to disk. In script_part2, read in the intermediate results from disk and create plots. This is a cleaner solution than the previous one, however, it is still not ideal in my opinion. Imagine the entire analysis workflow consists of 4, 5 or more steps instead of just 3, then we would have more sub-part scripts, and more intermediate data files to maintain.

Design choice

Basic idea

I feel that a disk-cache would be a better solution than the above 2. The idea is to modify the function that creates the intermediate results in Step 2, such that the results are saved to a dedicated location in the hard drive the first time it is run. For subsequent executions, it first checks out the cache location and reads in the data if they are there, or do a re-computation otherwise. There would also need be an overwrite flag so that after adjustments to the computation code, I could force a re-computation by setting overwrite to True, rather than going into the cache location and delete the cache file myself.

The decorator approach

Looking at this design idea, it appears really suggestive of the decorator approach. The core function responsible for the computation of Step 2 stays intact, and we use a decorator to wrap the caching and reading functionalities around it. That’s a plausible design choice.

But I may not want to permanently modify the core function’s behavior, by using the pie syntax of Python, like:

@DiskCacher(some_args)
def myFunctionForStep2():
   ...

Because I may want to re-use myFunctionForStep2() somewhere else where I don’t want this caching functionality, or simply want to disable caching temporally. Therefore, if decorator is the way to go, I would prefer this usage:

dec_func = DiskCacher(some_args)(myFunctionForStep2)
step_2_results = dec_func(args)
...

Here, dec_func is the decorated function with the caching functionality, or, we can think of it as an "upgraded" version of the original myFunctionForStep2. This reminds me of another syntax in Python, the with context manager.

The context manager approach

with context manager is typically used in the management of some kind of resources, for instance, a file object for reading or writing:

with open('file_path', 'r') as fout:
    fout.write(some_data)

This ensures that the data (some_data) are flushed to the file and you don’t end up with a corrupted data file. Without using with, you will have to remember to call fout.close() afterwards.

So another way I’d like my DiskCacher to function is using the with context manager, like this:

with DiskCacher(some_args, myFunctionForStep2) as dec_func:
    step_2_results = dec_func(args)

This should be functionally identical to the decorator approach shown above.

Id for the cached data

I would need some kind of identifier to label the intermediate data that go into the cache. This can’t be a randomly generated, universally unique ID string, because the reader function will never be able to locate them again next time the script is run. This also can’t be the input arguments to the function, because I will most likely have to serialize some numpy.ndarray and the size can get quite large. Some kind of hash of the input arguments might be better, but it won’t be very readable. Therefore, in the end, I decided to go the easy way and just use a string as the data ID.

The ID is composed using the name of the script file being executed, and a string label that distinguishes one piece of data from the others created in the same script. For instance, in the plot_vertical_structure.py script file, I created a vertical composite profile of air temperature, humidity and geopotential height. The id (which is also the name for the output file) for air temperature would be "plot_vertical_structure_airT.nc", the one for the humidity might be "plot_vertical_structure_hum.nc". The user is responsible for coming up with a label for each cached data variable so that no overwritten is happening.

Then I need to set a dedicated folder in the file system to save the cache, ~/datasets/quicksave/ is not a bad choice. By making it a keyword argument, I can use a specified folder when needed, or let the function use the default location otherwise.

Multiple return values

And lastly, I’d like my DiskCacher to be able to handle functions with 1 or multiple return values.

Implementation

Dummy data

The below snippet uses the CDAT package to create a dummy 2D slab as the sample data to test our DiskCacher with:

def createDummy():

    lats=np.arange(0, 90)
    lons=np.arange(0, 360)
    xx,yy=np.meshgrid(lons, lats)
    z=2*np.sin(xx*np.pi/180) + 3*np.cos(2*yy*np.pi/180)
    z=MV.array(z)

    latax=cdms.createAxis(lats)
    latax.designateLatitude()
    latax.id='y'
    latax.units='degree north'
    latax.name='latitude'

    lonax=cdms.createAxis(lons)
    lonax.designateLongitude()
    lonax.id='x'
    lonax.units='degree east'
    lonax.name='longitude'

    z.setAxisList([latax, lonax])
    z.id='z'
    z.units='1'
    z.long_name='dummy'

    return z

Dummy computation function

I used this dummy function to simulate some complicated computations (just pretend that this is some really time-consuming number crunching):

def func1(var, n, lat1=10, lat2=30):
    '''dummy test function'''

    result=var[slice(1,3)]*n
    result2=var(latitude=(lat1, lat2))*n*2
    print('### Computing in function.\n')

    return result, result2

It accepts a TransientVariable (var), an integer scalar (n), and 2 keyword arguments (lat1=10, lat2=30) as inputs, and returns 2 return values, both being TransientVariable.

Function to compose the id/filename

The below formFilename() function is used to compose a path to save the cache data:

def formFilename(folder, varid):
    '''Compose abspath for cache file

    Args:
        folder (str): cache folder path.
        varid (str): variable id to form file name and used as variable id.
    Returns:
        abpath (str): abspath for cache file, which is using the <folder>
            as folder. The file name is the format:
                [script_file]_[varid].nc
    '''
    script_file=os.path.splitext(sys.argv[0])[0]
    name='[%s]_[%s].nc' %(script_file, varid)
    abpath=os.path.join(folder, name)

    return abpath

Decide whether to re-compute or not

The rule is simple:

  1. If overwrite is True, do re-computation.
  2. Otherwise, check the existence of the cache file, if positive, read the data in, or
  3. Do a re-computation.

The below function returns a boolean result signifying whether a re-computation is needed:

def doRecomp(folder, varid_list, overwrite):
    '''Check whether to do recomputation or not

    Args:
        folder (str): cache folder path.
        varid_list (list): list of variable ids.
        overwrite (bool): whether to force a new computation or not.
    Returns:
        recmp (bool): if True, need to do recomputation, False otherwise.

    A check is performed on the existence of each file specified in
    <varid_list>, as =folder/file_name_for_varii=. =file_name_for_varii=
    is defined in formFilename(). If all files exist, no need to recompute,
    and return False. If any file missing, return True.

    If <overwrite> is True, return True to force a recomputation.
    '''

    recmp=False
    for vv in varid_list:
        abpath_in=formFilename(folder, vv)
        if not os.path.exists(abpath_in) or overwrite:
            recmp=True
            break

    return recmp

The DiskCacher class

Here is the core part, paste the code first:

class DiskCacher(object):
    def __init__(self, varid, func=None, folder=None, overwrite=False,
            verbose=True):
        '''Disk cache context manager

        Args:
            varid (str or list or tuple): if str, function <func> is assumed
                to return only 1 return value. If list or tuple, each variable
                id corresponds to one return value of <func>.
        Keyword Args:
            func (callable): function object whose return values are to be
                cached.
            folder (str or None): cache folder path. If None, use a default.
            overwrite (bool): whether to force a new computation or not.
            verbose (bool): whether to print some text info.
        '''

        if folder is None:
            self.folder='/home/guangzhi/datasets/quicksave2/'
        else:
            self.folder=folder

        self.func=func
        self.varid=varid
        self.overwrite=overwrite
        self.verbose=verbose

        #---------------Put varids to a list---------------
        self.varid_list=forceList(varid)

    def __enter__(self):
        '''Called in the =with= statement, return a decorated function
        '''
        if self.func is None:
            raise Exception("Need to provide a callable function to __init__() when used as context manager.")

        return _Cache2Disk(self.func, self.varid_list, self.folder,
                self.overwrite, self.verbose)

    def __exit__(self, type, value, traceback):
        '''Nothing to do after =with='''
        return

    def __call__(self, func=None):
        _func=func or self.func
        return _Cache2Disk(_func, self.varid_list, self.folder,
                self.overwrite, self.verbose)

I defined a class, rather than a function for this task, because it allows me to make it work as a with context manager. To to so, I need to add a __enter__() and a __exit__() method. The former is responsible for providing the resource being managed, (like the fout in with open(file_name, 'w') as fout:), and the latter for any maintenance work after leaving the context, in this case, nothing. More information regarding context managers with a class can be found here, here and here.

The constructor accepts 1 mandatory argument (varid) and 4 optional ones:

  • The varid is the unique label (or labels if multiple return values are to be cached) the user needs to provide to distinguish the data variables.

  • func is the function to be decorated, e.g. myFunctionForStep2. It can be None, because we’d also like to use this thing as a decorator. See later.

  • folder gives the path to a folder to save the cache, if None, use the default location defined inside __init__().

  • overwrite is used to force a re-computation, as mentioned above.

In the __enter__() method returns a decorated function by calling a _Cache2Disk() function. Here is its definition:

def _Cache2Disk(func, varid_list, folder, overwrite, verbose):
    '''Inner decorator function

    Args:
        func (callable): function object whose return values are to be
            cached.
        varid_list (list): list of variable ids.
        folder (str): cache folder path.
        overwrite (bool): whether to force a new computation or not.
        verbose (bool): whether to print some text info.
    Returns:
        decorated function: if cache exists, the function is <readCache>
            which will read cached data from disk. If needs to recompute,
            the function is wrapped that the return values are saved to disk
            before returning.
    '''

    def decorator_func(func):

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            if not doRecomp(folder, varid_list, overwrite):
                #----------Cache exists, do no recompute----------
                results=readCache(folder, varid_list, overwrite, verbose)
            else:
                #----------Cache not exists, do recompute----------
                results=func(*args, **kwargs)
                if isinstance(results, tuple):
                    if len(results)!=len(varid_list):
                        raise Exception("Length of result doesn't equal varid.")
                else:
                    results=(results,)
                if not os.path.exists(folder):
                    os.makedirs(folder)
                writeCache(results, folder, varid_list, verbose)

            if len(results)==1:
                return results[0]
            else:
                return results
            return results
        return wrapper

    return decorator_func(func)

If you are not familiar with decorators with arguments, here is some relevant information. Basically, _Cache2Disk defines a function decorator_func(func), and returns the decorated function as the return value (return decorator_func(func)).

What actually gets returned is the wrapper thing defined at Line 21. The wrapper is the "upgraded" version of the input function func. The "upgraded" behavior is that it would perform a check first whether a re-computation is needed:

if not doRecomp(folder, varid_list, overwrite):

If not, call the readCache() function to get the results. Otherwise, call the computation function to do the computation:

results=func(*args, **kwargs)

, and also write the results to cache:

writeCache(results, folder, varid_list, verbose)

, so that a future run would not need to the heavy labor again.

Note that there is also a __call__() method in the class. This is used to decorate a given function, and it also calls the _Cache2Disk() function. The difference is that you can now provide it with a different function (func) as the one given in the __init__() constructor.

Reader and writer functions

Then below are the definitions of the reader (readCache()) and writer (writeCache()) functions:

def readCache(folder, varid_list, overwrite=False, verbose=True):
    '''Read cached data

    Args:
        folder (str): cache folder path.
        varid_list (list): list of variable ids.
    Keyword Args:
        overwrite (bool): whether to force a new computation or not.
        verbose (bool): whether to print some text info.
    Returns:
        results (tuple): a tuple containing data read in from cached file(s).

    This func will form a file path for each variable given in <varid_list>,
    and save the read data into a tuple.
    '''

    results=[]
    for vv in varid_list:
        #---------------Form data filename---------------
        abpath_in=formFilename(folder, vv)
        if os.path.exists(abpath_in) and not overwrite:

            if verbose:
                print('\n# <readCache>: Read in variable', vv,
                        'from disk cache:\n', abpath_in)

            fin=cdms.open(abpath_in,'r')
            vii=fin(vv)
            fin.close()
            results.append(vii)

    return tuple(results)

def writeCache(results, folder, varid_list, verbose=True):
    '''Write data to disk cache

    Args:
        results (tuple): a tuple containing data read to cache.
        folder (str): cache folder path.
        varid_list (list): list of variable ids.
    Keyword Args:
        verbose (bool): whether to print some text info.

    This func will form a file path for each variable given in <varid_list>,
    and save the read data into a cach file.
    '''

    for varii, idii in zip(results, varid_list):
        abpath_in=formFilename(folder, idii)
        fout=cdms.open(abpath_in,'w')
        if verbose:
            print('\n# <writeCache>: Saving output to:\n',abpath_in)
        varii.id=idii
        fout.write(varii,typecode='f')
        fout.close()

    return

I’m using the CDAT package to do the file I/O, you can change to whatever you prefer.

Tests

Time to do some test drive. To start fresh, make sure the cache folder is clean, then we create the dummy data:

var=createDummy()

Usage 1: context manager

#-------------Usage 1: context manager-------------
print('\n\n#########################################')
print('Usage in context manager')

def func1(var, n, lat1=10, lat2=30):
    '''dummy test function'''

    result=var[slice(1,3)]*n
    result2=var(latitude=(lat1, lat2))*n*2
    print('### Computing in function.\n')

    return result, result2

with DiskCacher(['context_a', 'context_b'], func1) as func1b:
    result=func1b(var, 10, lat1=10, lat2=30)

with DiskCacher(['context_a', 'context_b'], func1) as func1b:
    result=func1b(var, 10, lat1=10, lat2=30)

with DiskCacher(['context_a', 'context_b'], func1, overwrite=True)\
        as func1b:
    result=func1b(var, 10, lat1=10, lat2=30)

print('#########################################')

The expected behavior is:

  1. func1b at Line 14, 15 is the 1st (decorated) function being called, so it will have to do the computation, and a ### Computing in function. line will be printed out. It will save the cache to disk, with given labels context_a and context_b given to the 2 return values (result, result2).
  2. func1b at Line 17, 18 is the 2nd (decorated) function call, because the labels are the same, and overwrite is default to False, it will read from cache.
  3. func1b at Line 21, 22 is the 3rd function call, with an overwrite=True, so it will be forced to do a re-computation.

Below is the terminal output:

Usage in context manager
### Computing in function.

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_a].nc

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_b].nc

# <readCache>: Read in variable context_a from disk cache:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_a].nc

# <readCache>: Read in variable context_b from disk cache:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_b].nc
### Computing in function.

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_a].nc

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_b].nc
#########################################

Usage 2: wrap a function

We can temporally wrap/decorate a given computation function, like so:

print('\n\n#########################################')
print('Usage as function wrap')

def func2(var, n, lat1=10, lat2=30):
    '''dummy test function'''

    result=var[slice(1,3)]*n
    result2=var(latitude=(lat1, lat2))*n*2
    print('### Computing in function.\n')

    return result, result2

func2b=DiskCacher(['wrap_a', 'wrap_b'])(func2)
result=func2b(var, 10, lat1=10, lat2=30)

func2b=DiskCacher(['wrap_a', 'wrap_b'])(func2)
result=func2b(var, 10, lat1=10, lat2=30)

func2b=DiskCacher(['wrap_a', 'wrap_b'], overwrite=True)(func2)
result=func2b(var, 10, lat1=10, lat2=30)

print('#########################################')

The expected behaviors are similar as before: 1st time a fresh run, 2nd time reads the cache, and the 3rd time forces a re-compute. Below is the output:

#########################################
Usage as function wrap
### Computing in function.

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_a].nc

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_b].nc

# <readCache>: Read in variable wrap_a from disk cache:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_a].nc

# <readCache>: Read in variable wrap_b from disk cache:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_b].nc
### Computing in function.

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_a].nc

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_b].nc
#########################################

Usage 3: permanently decorate a function

DiskCacher can be used with the pie syntax to permanently decorate a function, like so:

print('\n\n#########################################')
print('Usage as function decorator')

@DiskCacher(['dec_a', 'dec_b'])
def func3(var, n, lat1=10, lat2=30):
    '''dummy test function'''

    result=var[slice(1,3)]*n
    result2=var(latitude=(lat1, lat2))*n*2
    print('### Computing in function.\n')

    return result, result2

result=func3(var, 10, lat1=10, lat2=30)
result=func3(var, 10, lat1=10, lat2=30)

print('#########################################')

Note that this time we lose the ability to specify the overwrite flag, that’s also why I prefer the previous 2 approaches. Below is the output:

#########################################
Usage as function decorator
### Computing in function.

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_a].nc

# <writeCache>: Saving output to:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_b].nc

# <readCache>: Read in variable dec_a from disk cache:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_a].nc

# <readCache>: Read in variable dec_b from disk cache:
 /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_b].nc
#########################################

Conclusion

When data are too big to fit in the RAM, one may choose to cache the intermediate results to the disk instead. So long as the time needed to load the cache is shorter than the time to re-compute, it will be a worthwhile trade. This can also make the code cleaner and easier to maintain.

In this post we created a DiskCacher class that can be used as a with context manager, a function wrapper and as the pie-syntax decorator. It modifies a given computation function, and tries to use the cached data if possible, and only do the re-computation if the cache is not found or the user forces an overwrite. It uses a simple string ID to label the cached data. The ID is composed from the Python script file being executed, and a user defined string label. The former helps distinguish data created by different scripts, and the latter helps distinguish data within the same script. Overall, DiskCacher is a combination of the lazy evalution and memoizaton ideas introduced in Lazy evaluation and memoization in Python computations.

Leave a Reply