In a previous post we talked about in-memory caching using dictionary in Python. Sometimes the data may be too big in size to fit into the RAM, and the time required to re-compute them is significantly longer than the time needed to read in from disk. In such situations, one may choose to cache the data to disk, and load in the data to bypass the re-computation. In this post, I’ll share my implementation of some classes/functions that cache some NetCDF data to disk. The complete script file can be obtained from here.
Rationale
In my research work, I often have to write Python scripts that perform some analyses made up of multiple steps, say
- Step 1: read in data.
- Step 2: compute to get intermediate results.
- Step 3: create plots using results from Step 2.
And the same script has to be executed over and over again as I make adjustments to the code to fix problems, tweak some parameters in the algorithms or modify the plot layout etc.. For such a kind of workflow, it would be beneficial if I could cache the intermediate results in Step 2 somewhere, so that when I adjust the code in Step 3, I can read the results from the cache rather than re-computing them repeatedly.
A native and primitive solution would be to add a new code block between Step 2 and Step 3, to save the results from Step 2; comment out the relevant computation code for Step 2; and replace that with a block to read in the data from disk. The new script structure would be like:
- Step 1: read in data.
- Commented out Step 2: compute to get intermediate results.
- New Step 2a: Read in intermediate results from disk.
- New Step 2b: Save intermediate results to disk.
- Step 3: create plots using results from Step 2.
Apparently the New Step 2a and New Step 2b cannot co-exist. I have to use the New Step 2b first to save to disk, then comment it out, and uncomments New Step 2a for subsequent executions of the script. If I need to change the computation of Step 2, I will have to revive the old Step 2 and the New Step 2b, comment out New Step 2a, and run the script to save the updated intermediate results. Then comment/uncomment the code blocks to use the disk cache again. Needless to say, this gets confusing and tedious quick, both are prone to mistakes.
Another solution would be to break the script into 2
separate ones: in script_part1
, do Step 1 and Step 2, then save the
intermediate results to disk. In script_part2
, read in the
intermediate results from disk and create plots. This is a cleaner
solution than the previous one, however, it is still not ideal in my
opinion. Imagine the entire analysis workflow consists of 4, 5 or more
steps instead of just 3, then we would have more sub-part scripts,
and more intermediate data files to maintain.
Design choice
Basic idea
I feel that a disk-cache would be a better solution than the
above 2. The idea is to modify the function that creates the
intermediate results in Step 2, such that the results are saved to a
dedicated location in the hard drive the first time it is run. For
subsequent executions, it first checks out the cache location and reads
in the data if they are there, or do a re-computation otherwise.
There would also need be an overwrite flag so that after adjustments to
the computation code,
I could force a re-computation by setting overwrite to True
,
rather than going into the cache location and delete the cache file
myself.
The decorator approach
Looking at this design idea, it appears really suggestive of the decorator approach. The core function responsible for the computation of Step 2 stays intact, and we use a decorator to wrap the caching and reading functionalities around it. That’s a plausible design choice.
But I may not want to permanently modify the core function’s behavior, by using the pie syntax of Python, like:
@DiskCacher(some_args)
def myFunctionForStep2():
...
Because I may want to re-use myFunctionForStep2()
somewhere else
where I don’t want this caching functionality, or simply want to disable
caching temporally. Therefore, if decorator is the way to go, I would prefer
this usage:
dec_func = DiskCacher(some_args)(myFunctionForStep2)
step_2_results = dec_func(args)
...
Here, dec_func
is the decorated function with the caching
functionality, or, we can think of it as an "upgraded" version of the
original myFunctionForStep2
. This reminds me of another syntax in
Python, the with
context manager.
The context manager approach
with
context manager is typically used in the management of some
kind of resources, for instance, a file object for reading or writing:
with open('file_path', 'r') as fout:
fout.write(some_data)
This ensures that the data (some_data
) are flushed to the file and
you don’t end up with a corrupted data file. Without using with
, you
will have to remember to call fout.close()
afterwards.
So another way I’d like my DiskCacher
to function is using the with
context manager, like this:
with DiskCacher(some_args, myFunctionForStep2) as dec_func:
step_2_results = dec_func(args)
This should be functionally identical to the decorator approach shown above.
Id for the cached data
I would need some kind of identifier to label the intermediate
data that go into the cache. This can’t be a randomly generated,
universally unique ID string, because the reader function will never be able to
locate them again next time the script is run. This also can’t be the input
arguments to the function, because I will most likely have to serialize some
numpy.ndarray
and the size can get quite large. Some kind of hash of
the input arguments might be better, but it won’t be very
readable. Therefore, in the end, I decided to go the easy way and just
use a string as the data ID.
The ID is composed using the name of the script file being executed,
and a string label that distinguishes one piece of data from the
others created in the same script. For instance, in the
plot_vertical_structure.py
script file, I created a vertical
composite profile of air temperature, humidity and geopotential
height. The id (which is also the name for the output file) for air
temperature would be "plot_vertical_structure_airT.nc"
, the one for
the humidity might be "plot_vertical_structure_hum.nc"
. The user is
responsible for coming up with a label for each cached data variable so
that no overwritten is happening.
Then I need to set a dedicated folder in the file system to save the
cache, ~/datasets/quicksave/
is not a bad choice. By making it a
keyword argument, I can use a specified folder when needed, or let the
function use the default location otherwise.
Multiple return values
And lastly, I’d like my DiskCacher
to be able to handle functions
with 1 or multiple return values.
Implementation
Dummy data
The below snippet uses
the CDAT package to create a dummy 2D slab as the sample data to test our
DiskCacher
with:
def createDummy(): lats=np.arange(0, 90) lons=np.arange(0, 360) xx,yy=np.meshgrid(lons, lats) z=2*np.sin(xx*np.pi/180) + 3*np.cos(2*yy*np.pi/180) z=MV.array(z) latax=cdms.createAxis(lats) latax.designateLatitude() latax.id='y' latax.units='degree north' latax.name='latitude' lonax=cdms.createAxis(lons) lonax.designateLongitude() lonax.id='x' lonax.units='degree east' lonax.name='longitude' z.setAxisList([latax, lonax]) z.id='z' z.units='1' z.long_name='dummy' return z
Dummy computation function
I used this dummy function to simulate some complicated computations (just pretend that this is some really time-consuming number crunching):
def func1(var, n, lat1=10, lat2=30): '''dummy test function''' result=var[slice(1,3)]*n result2=var(latitude=(lat1, lat2))*n*2 print('### Computing in function.\n') return result, result2
It accepts a TransientVariable
(var
), an integer scalar (n
), and
2 keyword arguments (lat1=10, lat2=30
) as inputs, and returns 2
return values, both being TransientVariable
.
Function to compose the id/filename
The below formFilename()
function is used to compose a path to save
the cache data:
def formFilename(folder, varid): '''Compose abspath for cache file Args: folder (str): cache folder path. varid (str): variable id to form file name and used as variable id. Returns: abpath (str): abspath for cache file, which is using the <folder> as folder. The file name is the format: [script_file]_[varid].nc ''' script_file=os.path.splitext(sys.argv[0])[0] name='[%s]_[%s].nc' %(script_file, varid) abpath=os.path.join(folder, name) return abpath
Decide whether to re-compute or not
The rule is simple:
- If
overwrite
isTrue
, do re-computation. - Otherwise, check the existence of the cache file, if positive, read the data in, or
- Do a re-computation.
The below function returns a boolean result signifying whether a re-computation is needed:
def doRecomp(folder, varid_list, overwrite): '''Check whether to do recomputation or not Args: folder (str): cache folder path. varid_list (list): list of variable ids. overwrite (bool): whether to force a new computation or not. Returns: recmp (bool): if True, need to do recomputation, False otherwise. A check is performed on the existence of each file specified in <varid_list>, as =folder/file_name_for_varii=. =file_name_for_varii= is defined in formFilename(). If all files exist, no need to recompute, and return False. If any file missing, return True. If <overwrite> is True, return True to force a recomputation. ''' recmp=False for vv in varid_list: abpath_in=formFilename(folder, vv) if not os.path.exists(abpath_in) or overwrite: recmp=True break return recmp
The DiskCacher
class
Here is the core part, paste the code first:
class DiskCacher(object): def __init__(self, varid, func=None, folder=None, overwrite=False, verbose=True): '''Disk cache context manager Args: varid (str or list or tuple): if str, function <func> is assumed to return only 1 return value. If list or tuple, each variable id corresponds to one return value of <func>. Keyword Args: func (callable): function object whose return values are to be cached. folder (str or None): cache folder path. If None, use a default. overwrite (bool): whether to force a new computation or not. verbose (bool): whether to print some text info. ''' if folder is None: self.folder='/home/guangzhi/datasets/quicksave2/' else: self.folder=folder self.func=func self.varid=varid self.overwrite=overwrite self.verbose=verbose #---------------Put varids to a list--------------- self.varid_list=forceList(varid) def __enter__(self): '''Called in the =with= statement, return a decorated function ''' if self.func is None: raise Exception("Need to provide a callable function to __init__() when used as context manager.") return _Cache2Disk(self.func, self.varid_list, self.folder, self.overwrite, self.verbose) def __exit__(self, type, value, traceback): '''Nothing to do after =with=''' return def __call__(self, func=None): _func=func or self.func return _Cache2Disk(_func, self.varid_list, self.folder, self.overwrite, self.verbose)
I defined a class
, rather than a function for this task, because it
allows me to make it work as a with
context manager. To to so, I
need to add a __enter__()
and a __exit__()
method. The former
is responsible for providing the resource being managed, (like the
fout
in with open(file_name, 'w') as fout:
), and the latter for
any maintenance work after leaving the context, in this case,
nothing. More information regarding context managers with a class can
be found here, here and here.
The constructor accepts 1 mandatory argument (varid
) and 4 optional
ones:
-
The
varid
is the unique label (or labels if multiple return values are to be cached) the user needs to provide to distinguish the data variables. -
func
is the function to be decorated, e.g.myFunctionForStep2
. It can be None, because we’d also like to use this thing as a decorator. See later. -
folder
gives the path to a folder to save the cache, ifNone
, use the default location defined inside__init__()
. -
overwrite
is used to force a re-computation, as mentioned above.
In the __enter__()
method returns a decorated function by calling a
_Cache2Disk()
function. Here is its definition:
def _Cache2Disk(func, varid_list, folder, overwrite, verbose): '''Inner decorator function Args: func (callable): function object whose return values are to be cached. varid_list (list): list of variable ids. folder (str): cache folder path. overwrite (bool): whether to force a new computation or not. verbose (bool): whether to print some text info. Returns: decorated function: if cache exists, the function is <readCache> which will read cached data from disk. If needs to recompute, the function is wrapped that the return values are saved to disk before returning. ''' def decorator_func(func): @functools.wraps(func) def wrapper(*args, **kwargs): if not doRecomp(folder, varid_list, overwrite): #----------Cache exists, do no recompute---------- results=readCache(folder, varid_list, overwrite, verbose) else: #----------Cache not exists, do recompute---------- results=func(*args, **kwargs) if isinstance(results, tuple): if len(results)!=len(varid_list): raise Exception("Length of result doesn't equal varid.") else: results=(results,) if not os.path.exists(folder): os.makedirs(folder) writeCache(results, folder, varid_list, verbose) if len(results)==1: return results[0] else: return results return results return wrapper return decorator_func(func)
If you are not familiar with decorators with arguments, here is some
relevant information. Basically, _Cache2Disk
defines a function
decorator_func(func)
, and returns the decorated function as the
return value (return decorator_func(func)
).
What actually gets returned is the wrapper
thing defined at Line 21. The
wrapper
is the "upgraded" version of the input function func
. The
"upgraded" behavior is that it would perform a check first whether a
re-computation is needed:
if not doRecomp(folder, varid_list, overwrite):
If not, call the readCache()
function to get
the results. Otherwise, call the computation function to do the
computation:
results=func(*args, **kwargs)
, and also write the results to cache:
writeCache(results, folder, varid_list, verbose)
, so that a future run would not need to the heavy labor again.
Note that there is also a __call__()
method in the class. This is
used to decorate a given function, and it also calls the
_Cache2Disk()
function. The difference is that you can now provide
it with a different function (func
) as the one given in the
__init__()
constructor.
Reader and writer functions
Then below are the definitions of the reader (readCache()
) and
writer (writeCache()
) functions:
def readCache(folder, varid_list, overwrite=False, verbose=True): '''Read cached data Args: folder (str): cache folder path. varid_list (list): list of variable ids. Keyword Args: overwrite (bool): whether to force a new computation or not. verbose (bool): whether to print some text info. Returns: results (tuple): a tuple containing data read in from cached file(s). This func will form a file path for each variable given in <varid_list>, and save the read data into a tuple. ''' results=[] for vv in varid_list: #---------------Form data filename--------------- abpath_in=formFilename(folder, vv) if os.path.exists(abpath_in) and not overwrite: if verbose: print('\n# <readCache>: Read in variable', vv, 'from disk cache:\n', abpath_in) fin=cdms.open(abpath_in,'r') vii=fin(vv) fin.close() results.append(vii) return tuple(results) def writeCache(results, folder, varid_list, verbose=True): '''Write data to disk cache Args: results (tuple): a tuple containing data read to cache. folder (str): cache folder path. varid_list (list): list of variable ids. Keyword Args: verbose (bool): whether to print some text info. This func will form a file path for each variable given in <varid_list>, and save the read data into a cach file. ''' for varii, idii in zip(results, varid_list): abpath_in=formFilename(folder, idii) fout=cdms.open(abpath_in,'w') if verbose: print('\n# <writeCache>: Saving output to:\n',abpath_in) varii.id=idii fout.write(varii,typecode='f') fout.close() return
I’m using the CDAT
package to do the file I/O, you can change to
whatever you prefer.
Tests
Time to do some test drive. To start fresh, make sure the cache folder is clean, then we create the dummy data:
var=createDummy()
Usage 1: context manager
#-------------Usage 1: context manager------------- print('\n\n#########################################') print('Usage in context manager') def func1(var, n, lat1=10, lat2=30): '''dummy test function''' result=var[slice(1,3)]*n result2=var(latitude=(lat1, lat2))*n*2 print('### Computing in function.\n') return result, result2 with DiskCacher(['context_a', 'context_b'], func1) as func1b: result=func1b(var, 10, lat1=10, lat2=30) with DiskCacher(['context_a', 'context_b'], func1) as func1b: result=func1b(var, 10, lat1=10, lat2=30) with DiskCacher(['context_a', 'context_b'], func1, overwrite=True)\ as func1b: result=func1b(var, 10, lat1=10, lat2=30) print('#########################################')
The expected behavior is:
func1b
at Line 14, 15 is the 1st (decorated) function being called, so it will have to do the computation, and a### Computing in function.
line will be printed out. It will save the cache to disk, with given labelscontext_a
andcontext_b
given to the 2 return values (result
,result2
).func1b
at Line 17, 18 is the 2nd (decorated) function call, because the labels are the same, andoverwrite
is default toFalse
, it will read from cache.func1b
at Line 21, 22 is the 3rd function call, with anoverwrite=True
, so it will be forced to do a re-computation.
Below is the terminal output:
Usage in context manager ### Computing in function. # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_a].nc # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_b].nc # <readCache>: Read in variable context_a from disk cache: /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_a].nc # <readCache>: Read in variable context_b from disk cache: /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_b].nc ### Computing in function. # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_a].nc # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[context_b].nc #########################################
Usage 2: wrap a function
We can temporally wrap/decorate a given computation function, like so:
print('\n\n#########################################') print('Usage as function wrap') def func2(var, n, lat1=10, lat2=30): '''dummy test function''' result=var[slice(1,3)]*n result2=var(latitude=(lat1, lat2))*n*2 print('### Computing in function.\n') return result, result2 func2b=DiskCacher(['wrap_a', 'wrap_b'])(func2) result=func2b(var, 10, lat1=10, lat2=30) func2b=DiskCacher(['wrap_a', 'wrap_b'])(func2) result=func2b(var, 10, lat1=10, lat2=30) func2b=DiskCacher(['wrap_a', 'wrap_b'], overwrite=True)(func2) result=func2b(var, 10, lat1=10, lat2=30) print('#########################################')
The expected behaviors are similar as before: 1st time a fresh run, 2nd time reads the cache, and the 3rd time forces a re-compute. Below is the output:
######################################### Usage as function wrap ### Computing in function. # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_a].nc # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_b].nc # <readCache>: Read in variable wrap_a from disk cache: /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_a].nc # <readCache>: Read in variable wrap_b from disk cache: /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_b].nc ### Computing in function. # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_a].nc # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[wrap_b].nc #########################################
Usage 3: permanently decorate a function
DiskCacher
can be used with the pie syntax to permanently
decorate a function, like so:
print('\n\n#########################################') print('Usage as function decorator') @DiskCacher(['dec_a', 'dec_b']) def func3(var, n, lat1=10, lat2=30): '''dummy test function''' result=var[slice(1,3)]*n result2=var(latitude=(lat1, lat2))*n*2 print('### Computing in function.\n') return result, result2 result=func3(var, 10, lat1=10, lat2=30) result=func3(var, 10, lat1=10, lat2=30) print('#########################################')
Note that this time we lose the ability to specify the overwrite
flag, that’s also why I prefer the previous 2 approaches. Below is the output:
######################################### Usage as function decorator ### Computing in function. # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_a].nc # <writeCache>: Saving output to: /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_b].nc # <readCache>: Read in variable dec_a from disk cache: /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_a].nc # <readCache>: Read in variable dec_b from disk cache: /home/guangzhi/datasets/quicksave2/[diskcacher]_[dec_b].nc #########################################
Conclusion
When data are too big to fit in the RAM, one may choose to cache the intermediate results to the disk instead. So long as the time needed to load the cache is shorter than the time to re-compute, it will be a worthwhile trade. This can also make the code cleaner and easier to maintain.
In this post we created a DiskCacher
class that can be used as a
with
context manager, a function wrapper and as the pie-syntax
decorator. It modifies a given computation function, and tries to use
the cached data if possible, and only do the re-computation if the
cache is not found or the user forces an overwrite. It uses a simple
string ID to label the cached data. The ID is composed from the Python
script file being executed, and a user defined string label. The
former helps distinguish data created by different scripts, and the
latter helps distinguish data within the same script. Overall, DiskCacher
is a
combination of the lazy evalution and memoizaton ideas introduced
in
Lazy evaluation and memoization in Python computations.