Source code for mdtraj.formats.netcdf

##############################################################################
# MDTraj: A Python Library for Loading, Saving, and Manipulating
#         Molecular Dynamics Trajectories.
# Copyright 2012-2013 Stanford University and the Authors
#
# Authors: Robert McGibbon
# Contributors:
#
# MDTraj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################


"""
This module provides the ability to read and write AMBER NetCDF trajectories.

The code is heavily based on amber_netcdf_trajectory_tools.py by John Chodera.
"""

import os
import socket
import warnings
from datetime import datetime

import numpy as np

import mdtraj
from mdtraj.formats.registry import FormatRegistry
from mdtraj.utils import cast_indices, ensure_type, import_, in_units_of

__all__ = ["NetCDFTrajectoryFile", "load_netcdf"]

##############################################################################
# classes
##############################################################################


[docs] @FormatRegistry.register_loader(".nc") @FormatRegistry.register_loader(".netcdf") @FormatRegistry.register_loader(".ncdf") def load_netcdf(filename, top=None, stride=None, atom_indices=None, frame=None): """Load an AMBER NetCDF file. Since the NetCDF format doesn't contain information to specify the topology, you need to supply a topology Parameters ---------- filename : path-like filename of AMBER NetCDF file. top : {str, Trajectory, Topology} The NetCDF format does not contain topology information. Pass in either the path to a pdb file, a trajectory, or a topology to supply this information. stride : int, default=None Only read every stride-th frame atom_indices : array_like, optional If not None, then read only a subset of the atoms coordinates from the file. This may be slightly slower than the standard read because it requires an extra copy, but will save memory. frame : int, optional Use this option to load only a single frame from a trajectory on disk. If frame is None, the default, the entire trajectory will be loaded. If supplied, ``stride`` will be ignored. Returns ------- trajectory : md.Trajectory The resulting trajectory, as an md.Trajectory object. See Also -------- mdtraj.NetCDFTrajectoryFile : Low level interface to NetCDF files """ from mdtraj.core.trajectory import _parse_topology if top is None: raise ValueError('"top" argument is required for load_netcdf') topology = _parse_topology(top) atom_indices = cast_indices(atom_indices) with NetCDFTrajectoryFile(filename) as f: if frame is not None: f.seek(frame) n_frames = 1 else: n_frames = None return f.read_as_traj( topology, n_frames=n_frames, atom_indices=atom_indices, stride=stride, )
[docs] @FormatRegistry.register_fileobject(".nc") @FormatRegistry.register_fileobject(".netcdf") @FormatRegistry.register_fileobject(".ncdf") class NetCDFTrajectoryFile: """Interface for reading and writing to AMBER NetCDF files. This is a file-like object, that supports both reading or writing depending on the `mode` flag. It implements the context manager protocol, so you can also use it with the python 'with' statement. Parameters ---------- filename : path-like The name of the file to open mode : {'r', 'w'}, default='r' The mode in which to open the file. Valid options are 'r' and 'w' for 'read' and 'write', respectively. force_overwrite : bool, default=False In write mode, if a file named `filename` already exists, clobber it and overwrite it. """ distance_unit = "angstroms"
[docs] def __init__(self, filename, mode="r", force_overwrite=True): self._closed = True # is the file currently closed? self._mode = mode # what mode were we opened in try: # import netcdf4 if it's available import netCDF4 netcdf = netCDF4.Dataset # set input args for netCDF4 input_args = {"format": "NETCDF3_64BIT", "clobber": force_overwrite} except ImportError: import scipy.io netcdf = scipy.io.netcdf_file warning_message = ( "Warning: The 'netCDF4' Python package is not installed. MDTraj is using the 'scipy' " "implementation to read and write netCDF files,which can be significantly slower.\n" "For improved performance, consider installing netCDF4. See installation instructions at:\n" "https://unidata.github.io/netcdf4-python/#quick-install" ) warnings.warn(warning_message) # input args for scipy.io.netcdf_file # AMBER uses the NetCDF3 format, with 64 bit encodings, which # for scipy.io.netcdf_file is "version=2" input_args = {"version": 2} if mode not in ["r", "w"]: raise ValueError("mode must be one of ['r', 'w']") if mode == "w" and not force_overwrite and os.path.exists(filename): raise OSError('"%s" already exists' % filename) self._handle = netcdf(filename, mode=mode, **input_args) self._closed = False # self._frame_index is the current frame that we're at in the # file # self._needs_initialization indicates whether we need to set the # global properties of the file. This is required before the first # write operation on a new file if mode == "w": self._frame_index = 0 self._needs_initialization = True elif mode == "r": self._frame_index = 0 self._needs_initialization = False else: raise RuntimeError()
@property def n_atoms(self): self._validate_open() if self._needs_initialization: raise OSError("The file is uninitialized.") try: # netCDF4 requires use of .size on dimensions # to get size as integer. return self._handle.dimensions["atom"].size except AttributeError: # scipy case return self._handle.dimensions["atom"] @property def n_frames(self): self._validate_open() if not self._needs_initialization: return self._handle.variables["coordinates"].shape[0] return 0 def _validate_open(self): if self._closed: raise OSError("The file is closed.") def read_as_traj(self, topology, n_frames=None, stride=None, atom_indices=None): """Read a trajectory from a NetCDF file Parameters ---------- topology : Topology The system topology n_frames : int, optional If positive, then read only the next `n_frames` frames. Otherwise read all of the frames in the file. stride : np.ndarray, optional Read only every stride-th frame. atom_indices : array_like, optional If not none, then read only a subset of the atoms coordinates from the file. This may be slightly slower than the standard read because it required an extra copy, but will save memory. Returns ------- trajectory : Trajectory A trajectory object containing the loaded portion of the file. """ from mdtraj.core.trajectory import Trajectory if atom_indices is not None: topology = topology.subset(atom_indices) xyz, time, cell_lengths, cell_angles = self.read( n_frames=n_frames, stride=stride, atom_indices=atom_indices, ) if len(xyz) == 0: return Trajectory(xyz=np.zeros((0, topology.n_atoms, 3)), topology=topology) xyz = in_units_of( xyz, self.distance_unit, Trajectory._distance_unit, inplace=True, ) cell_lengths = in_units_of( cell_lengths, self.distance_unit, Trajectory._distance_unit, inplace=True, ) return Trajectory( xyz=xyz, topology=topology, time=time, unitcell_lengths=cell_lengths, unitcell_angles=cell_angles, ) def read(self, n_frames=None, stride=None, atom_indices=None): """Read data from a molecular dynamics trajectory in the AMBER NetCDF format. Parameters ---------- n_frames : int, optional If n_frames is not None, the next n_frames of data from the file will be read. Otherwise, all of the frames in the file will be read. stride : int, optional If stride is not None, read only every stride-th frame from disk. atom_indices : np.ndarray, dtype=int, optional The specific indices of the atoms you'd like to retrieve. If not supplied, all of the atoms will be retrieved. Returns ------- coordinates : np.ndarray, shape=(n_frames, n_atoms, 3) The cartesian coordinates of the atoms, in units of angstroms. time : np.ndarray, None The time corresponding to each frame, in units of picoseconds, or None if no time information is present in the trajectory. cell_lengths : np.ndarray, None The lengths (a,b,c) of the unit cell for each frame, or None if the information is not present in the file. cell_angles : np.ndarray, None The angles (\alpha, \beta, \\gamma) defining the unit cell for each frame, or None if the information is not present in the file. """ self._validate_open() if self._mode != "r": raise OSError( "The file was opened in mode=%s. Reading is not allowed." % self._mode, ) if n_frames is None: n_frames = np.inf elif stride is not None: # 'n_frames' frames should be read in total n_frames *= stride total_n_frames = self.n_frames frame_slice = slice( self._frame_index, self._frame_index + min(n_frames, total_n_frames), stride, ) if self._frame_index >= total_n_frames: # just return something that'll look like len(xyz) == 0 # this is basically just an alternative to throwing an indexerror return np.array([]), None, None, None if atom_indices is None: # get all of the atoms atom_slice = slice(None) else: atom_slice = ensure_type( atom_indices, dtype=int, ndim=1, name="atom_indices", warn_on_cast=False, ) if not np.all(atom_slice < self.n_atoms): raise ValueError( "As a zero-based index, the entries in " "atom_indices must all be less than the number of atoms " "in the trajectory, %d" % self.n_atoms, ) if not np.all(atom_slice >= 0): raise ValueError( "The entries in atom_indices must be greater " "than or equal to zero", ) if "coordinates" in self._handle.variables: coordinates = self._handle.variables["coordinates"][ frame_slice, atom_slice, :, ] else: raise ValueError( "No coordinates found in the NetCDF file. The only " "variables in the file were %s" % self._handle.variables.keys(), ) if "time" in self._handle.variables: time = self._handle.variables["time"][frame_slice] else: time = None if "cell_lengths" in self._handle.variables: cell_lengths = self._handle.variables["cell_lengths"][frame_slice] else: cell_lengths = None if "cell_angles" in self._handle.variables: cell_angles = self._handle.variables["cell_angles"][frame_slice] else: cell_angles = None if cell_lengths is None and cell_angles is not None: warnings.warn("cell_lengths were found, but no cell_angles") if cell_lengths is not None and cell_angles is None: warnings.warn("cell_angles were found, but no cell_lengths") self._frame_index = self._frame_index + min(n_frames, total_n_frames) # scipy.io.netcdf variables are mem-mapped, and are only backed # by valid memory while the file handle is open. This is _bad_. # because we need to support the user opening the file, reading # the coordinates, and then closing it, and still having the # coordinates be a valid memory segment. # https://github.com/rmcgibbo/mdtraj/issues/440 if coordinates is not None and not coordinates.flags["WRITEABLE"]: coordinates = np.array(coordinates, copy=True) if time is not None and not time.flags["WRITEABLE"]: time = np.array(time, copy=True) if cell_lengths is not None and not cell_lengths.flags["WRITEABLE"]: cell_lengths = np.array(cell_lengths, copy=True) if cell_angles is not None and not cell_angles.flags["WRITEABLE"]: cell_angles = np.array(cell_angles, copy=True) return coordinates, time, cell_lengths, cell_angles def write(self, coordinates, time=None, cell_lengths=None, cell_angles=None): """Write one or more frames of a molecular dynamics trajectory to disk in the AMBER NetCDF format. Parameters ---------- coordinates : np.ndarray, dtype=np.float32, shape=(n_frames, n_atoms, 3) The cartesian coordinates of each atom, in units of angstroms. time : np.ndarray, dtype=np.float32, shape=(n_frames), optional The time index corresponding to each frame, in units of picoseconds. cell_lengths : np.ndarray, dtype=np.double, shape=(n_frames, 3) The lengths (a,b,c) of the unit cell for each frame. cell_angles : np.ndarray, dtype=np.double, shape=(n_frames, 3) The angles (\alpha, \beta, \\gamma) defining the unit cell for each frame. Notes ----- If the input arrays are of dimension deficient by one, for example if the coordinates array is two dimensional, the time is a single scalar or cell_lengths and cell_angles are a 1d array of length three, that is okay. You'll simply be saving a single frame. """ self._validate_open() if self._mode not in ["w", "ws", "a", "as"]: raise OSError( "The file was opened in mode=%s. Writing is not allowed." % self._mode, ) coordinates = in_units_of(coordinates, None, "angstroms") time = in_units_of(time, None, "picoseconds") cell_lengths = in_units_of(cell_lengths, None, "angstroms") cell_angles = in_units_of(cell_angles, None, "degrees") # typecheck all of the input arguments rigorously coordinates = ensure_type( coordinates, np.float32, 3, "coordinates", length=None, can_be_none=False, shape=(None, None, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) n_frames, n_atoms = coordinates.shape[0], coordinates.shape[1] time = ensure_type( time, np.float32, 1, "time", length=n_frames, can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) cell_lengths = ensure_type( cell_lengths, np.float64, 2, "cell_lengths", length=n_frames, can_be_none=True, shape=(n_frames, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) cell_angles = ensure_type( cell_angles, np.float64, 2, "cell_angles", length=n_frames, can_be_none=True, shape=(n_frames, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) # are we dealing with a periodic system? if (cell_lengths is None and cell_angles is not None) or (cell_lengths is not None and cell_angles is None): provided, neglected = "cell_lengths", "cell_angles" if cell_lengths is None: provided, neglected = neglected, provided raise ValueError( f'You provided the variable "{provided}", but neglected to ' f'provide "{neglected}". They either BOTH must be provided, or ' "neither. Having one without the other is meaningless", ) if self._needs_initialization: self._initialize_headers( n_atoms=n_atoms, set_coordinates=True, set_time=(time is not None), set_cell=(cell_lengths is not None and cell_angles is not None), ) self._needs_initialization = False # this slice object says where we're going to put the data in the # arrays frame_slice = slice(self._frame_index, self._frame_index + n_frames) # deposit the data try: self._handle.variables["coordinates"][frame_slice, :, :] = coordinates if time is not None: self._handle.variables["time"][frame_slice] = time if cell_lengths is not None: self._handle.variables["cell_lengths"][frame_slice, :] = cell_lengths if cell_angles is not None: self._handle.variables["cell_angles"][frame_slice, :] = cell_angles except KeyError as e: raise ValueError( "The file that you're trying to save to doesn't " "contain the field %s." % str(e), ) # check for missing attributes missing = None if time is None and "time" in self._handle.variables: missing = "time" elif cell_angles is None and "cell_angles" in self._handle.variables: missing = "cell_angles" elif cell_lengths is None and "cell_lengths" in self._handle.variables: missing = "cell_lengths" if missing is not None: raise ValueError( "The file that you're saving to expects each frame " "to contain %s information, but you did not supply it." "I don't allow 'ragged' arrays." % missing, ) # update the frame index pointers. this should be done at the # end so that if anything errors out, we don't actually get here self._frame_index += n_frames def flush(self): "Write all buffered data in the to the disk file." self._validate_open() self._handle.sync() def _initialize_headers(self, set_coordinates, n_atoms, set_time, set_cell): """Initialize the NetCDF file according to the AMBER NetCDF Convention, Version 1.0, revision B. The convention is defined here: http://ambermd.org/netcdf/nctraj.xhtml """ # Set attributes. setattr( self._handle, "title", f"CREATED at {datetime.now()} on {socket.gethostname()}", ) setattr(self._handle, "application", "Omnia") setattr(self._handle, "program", "MDTraj") setattr(self._handle, "programVersion", mdtraj.__version__) setattr(self._handle, "Conventions", "AMBER") setattr(self._handle, "ConventionVersion", "1.0") # set the dimensions # unlimited number of frames in trajectory self._handle.createDimension("frame", 0) # number of spatial coordinates self._handle.createDimension("spatial", 3) # number of atoms self._handle.createDimension("atom", n_atoms) if set_cell: # three spatial coordinates for the length of the unit cell self._handle.createDimension("cell_spatial", 3) # three spatial coordinates for the angles that define the shape # of the unit cell self._handle.createDimension("cell_angular", 3) # length of the longest string used for a label self._handle.createDimension("label", 5) # Define variables to store unit cell data cell_lengths = self._handle.createVariable( "cell_lengths", "d", ("frame", "cell_spatial"), ) setattr(cell_lengths, "units", "angstrom") cell_angles = self._handle.createVariable( "cell_angles", "d", ("frame", "cell_angular"), ) setattr(cell_angles, "units", "degree") self._handle.createVariable("cell_spatial", "c", ("cell_spatial",)) self._handle.variables["cell_spatial"][0] = "a" self._handle.variables["cell_spatial"][1] = "b" self._handle.variables["cell_spatial"][2] = "c" self._handle.createVariable("cell_angular", "c", ("cell_spatial", "label")) self._handle.variables["cell_angular"][0] = "alpha" self._handle.variables["cell_angular"][1] = "beta " self._handle.variables["cell_angular"][2] = "gamma" if set_time: # Define coordinates and snapshot times. frame_times = self._handle.createVariable("time", "f", ("frame",)) setattr(frame_times, "units", "picosecond") if set_coordinates: frame_coordinates = self._handle.createVariable( "coordinates", "f", ("frame", "atom", "spatial"), ) setattr(frame_coordinates, "units", "angstrom") self._handle.createVariable("spatial", "c", ("spatial",)) self._handle.variables["spatial"][0] = "x" self._handle.variables["spatial"][1] = "y" self._handle.variables["spatial"][2] = "z" def seek(self, offset, whence=0): """Move to a new file position Parameters ---------- offset : int A number of frames. whence : {0, 1, 2} 0: offset from start of file, offset should be >=0. 1: move relative to the current position, positive or negative 2: move relative to the end of file, offset should be <= 0. Seeking beyond the end of a file is not supported """ if whence == 0 and offset >= 0: self._frame_index = offset elif whence == 1: self._frame_index = self._frame_index + offset elif whence == 2 and offset <= 0: self._frame_index = self.n_frames + offset else: raise OSError("Invalid argument") def tell(self): """Current file position Returns ------- offset : int The current frame in the file. """ return int(self._frame_index) def close(self): """Close the NetCDF file handle""" if not self._closed and hasattr(self, "_handle"): self._handle.close() self._closed = True def __enter__(self): # supports the context manager protocol return self def __exit__(self, *exc_info): # supports the context manager protocol self.close() def __del__(self): self.close() def __len__(self): if self._closed: raise ValueError("I/O operation on closed file") return self.n_frames