Source code for mdtraj.formats.hdf5

##############################################################################
# MDTraj: A Python Library for Loading, Saving, and Manipulating
#         Molecular Dynamics Trajectories.
# Copyright 2012-2013 Stanford University and the Authors
#
# Authors: Robert McGibbon
# Contributors:
#
# MDTraj is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################

"""
This module implements the MDTraj HDF5 format described at
https://github.com/mdtraj/mdtraj/wiki/HDF5-Trajectory-Format
"""

##############################################################################
# Imports
##############################################################################

import operator
import os
import warnings
from collections import namedtuple

try:
    import simplejson as json
except ImportError:
    import json

# 3rd party
import numpy as np

import mdtraj.core.element as elem

# ours
import mdtraj
from mdtraj.core.topology import Topology
from mdtraj.formats.registry import FormatRegistry
from mdtraj.utils import cast_indices, ensure_type, import_, in_units_of

__all__ = ["HDF5TrajectoryFile", "load_hdf5"]

Frames = namedtuple(
    "Frames",
    [
        "coordinates",
        "time",
        "cell_lengths",
        "cell_angles",
        "velocities",
        "kineticEnergy",
        "potentialEnergy",
        "temperature",
        "alchemicalLambda",
    ],
)

##############################################################################
# Code
##############################################################################


[docs] @FormatRegistry.register_loader(".h5") @FormatRegistry.register_loader(".hdf5") def load_hdf5(filename, stride=None, atom_indices=None, frame=None): """Load an MDTraj hdf5 trajectory file from disk. Parameters ---------- filename : path-like Path of HDF Trajectory file. stride : int, default=None Only read every stride-th frame atom_indices : array_like, optional If not none, then read only a subset of the atoms coordinates from the file. This may be slightly slower than the standard read because it requires an extra copy, but will save memory. frame : int, optional Use this option to load only a single frame from a trajectory on disk. If frame is None, the default, the entire trajectory will be loaded. If supplied, ``stride`` will be ignored. Examples -------- >>> import mdtraj as md >>> traj = md.load_hdf5('output.h5') >>> print traj <mdtraj.Trajectory with 500 frames, 423 atoms at 0x110740a90> >>> traj2 = md.load_hdf5('output.h5', stride=2, top='topology.pdb') >>> print traj2 <mdtraj.Trajectory with 250 frames, 423 atoms at 0x11136e410> Returns ------- trajectory : md.Trajectory The resulting trajectory, as an md.Trajectory object. See Also -------- mdtraj.HDF5TrajectoryFile : Low level interface to HDF5 files """ if not isinstance(filename, (str, os.PathLike)): raise TypeError( "filename must be of type path-like for load_lh5. " "you supplied %s" % type(filename), ) atom_indices = cast_indices(atom_indices) with HDF5TrajectoryFile(filename) as f: if frame is not None: f.seek(frame) n_frames = 1 else: n_frames = None return f.read_as_traj(n_frames=n_frames, stride=stride, atom_indices=atom_indices)
[docs] @FormatRegistry.register_fileobject(".h5") @FormatRegistry.register_fileobject(".hdf5") class HDF5TrajectoryFile: """Interface for reading and writing to a MDTraj HDF5 molecular dynamics trajectory file, whose format is described `here <https://github.com/rmcgibbo/mdtraj/issues/36>`_. This is a file-like object, that both reading or writing depending on the `mode` flag. It implements the context manager protocol, so you can also use it with the python 'with' statement. The format is extremely flexible and high performance. It can hold a wide variety of information about a trajectory, including fields like the temperature and energies. Because it's built on the fantastic HDF5 library, it's easily extensible too. Parameters ---------- filename : path-like Path to the file to open mode : {'r, 'w'} Mode in which to open the file. 'r' is for reading and 'w' is for writing force_overwrite : bool In mode='w', how do you want to behave if a file by the name of `filename` already exists? if `force_overwrite=True`, it will be overwritten. compression : {'zlib', None} Apply compression to the file? This will save space, and does not cost too many cpu cycles, so it's recommended. Attributes ---------- root title application topology randomState forcefield reference constraints See Also -------- mdtraj.load_hdf5 : High-level wrapper that returns a ``md.Trajectory`` """ distance_unit = "nanometers"
[docs] def __init__(self, filename, mode="r", force_overwrite=True, compression="zlib"): self._open = False # is the file handle currently open? self.mode = mode # the mode in which the file was opened? if mode not in ["r", "w", "a"]: raise ValueError("mode must be one of ['r', 'w', 'a']") if mode == "w" and not force_overwrite and os.path.exists(filename): raise OSError('"%s" already exists' % filename) # import tables self.tables = import_("tables") if compression == "zlib": compression = self.tables.Filters(complib="zlib", shuffle=True, complevel=1) elif compression is None: compression = None else: raise ValueError('compression must be either "zlib" or None') self._handle = self._open_file(filename, mode=mode, filters=compression) self._open = True if mode == "w": # what frame are we currently reading or writing at? self._frame_index = 0 # do we need to write the header information? self._needs_initialization = True if not os.fspath(filename).endswith(".h5"): warnings.warn("The .h5 extension is recommended.") elif mode == "a": try: self._frame_index = len(self._handle.root.coordinates) self._needs_initialization = False except self.tables.NoSuchNodeError: self._frame_index = 0 self._needs_initialization = True elif mode == "r": self._frame_index = 0 self._needs_initialization = False
@property def root(self): """Direct access to the root group of the underlying Tables HDF5 file handle. This can be used for random or specific access to the underlying arrays on disk """ _check_mode(self.mode, ("r", "a")) return self._handle.root ##################################################### # title global attribute (optional) ##################################################### @property def title(self): """User-defined title for the data represented in the file""" if hasattr(self._handle.root._v_attrs, "title"): return str(self._handle.root._v_attrs.title) return None @title.setter def title(self, value): """Set the user-defined title for the data represented in the file""" _check_mode(self.mode, ("w", "a")) self._handle.root._v_attrs.title = str(value) ##################################################### # application global attribute (optional) ##################################################### @property def application(self): "Suite of programs that created the file" if hasattr(self._handle.root._v_attrs, "application"): return str(self._handle.root._v_attrs.application) return None @application.setter def application(self, value): "Set the suite of programs that created the file" _check_mode(self.mode, ("w", "a")) self._handle.root._v_attrs.application = str(value) ##################################################### # topology global attribute (optional, recommended) ##################################################### @property def topology(self): """Get the topology out from the file Returns ------- topology : mdtraj.Topology A topology object """ try: raw = self._get_node("/", name="topology")[0] if not isinstance(raw, str): raw = raw.decode() topology_dict = json.loads(raw) except self.tables.NoSuchNodeError: return None topology = Topology() for chain_dict in sorted(topology_dict["chains"], key=operator.itemgetter("index")): chain = topology.add_chain() for residue_dict in sorted(chain_dict["residues"], key=operator.itemgetter("index")): try: resSeq = residue_dict["resSeq"] except KeyError: resSeq = None warnings.warn("No resSeq information found in HDF file, defaulting to zero-based indices") try: segment_id = residue_dict["segmentID"] except KeyError: segment_id = "" residue = topology.add_residue(residue_dict["name"], chain, resSeq=resSeq, segment_id=segment_id) for atom_dict in sorted(residue_dict["atoms"], key=operator.itemgetter("index")): try: element = elem.get_by_symbol(atom_dict["element"]) except KeyError: element = elem.virtual topology.add_atom(atom_dict["name"], element, residue) atoms = list(topology.atoms) for index1, index2 in topology_dict["bonds"]: topology.add_bond(atoms[index1], atoms[index2]) return topology @topology.setter def topology(self, topology_object): """Set the topology in the file Parameters ---------- topology_object : mdtraj.Topology A topology object """ _check_mode(self.mode, ("w", "a")) # we want to be able to handle the openmm Topology object # here too, so if it's not an mdtraj topology we'll just guess # that it's probably an openmm topology and convert if not isinstance(topology_object, Topology): topology_object = Topology.from_openmm(topology_object) try: topology_dict = { "chains": [], "bonds": [], } for chain in topology_object.chains: chain_dict = { "residues": [], "index": int(chain.index), } for residue in chain.residues: residue_dict = { "index": int(residue.index), "name": str(residue.name), "atoms": [], "resSeq": int(residue.resSeq), "segmentID": str(residue.segment_id), } for atom in residue.atoms: try: element_symbol_string = str(atom.element.symbol) except AttributeError: element_symbol_string = "" residue_dict["atoms"].append( { "index": int(atom.index), "name": str(atom.name), "element": element_symbol_string, }, ) chain_dict["residues"].append(residue_dict) topology_dict["chains"].append(chain_dict) for atom1, atom2 in topology_object.bonds: topology_dict["bonds"].append( [ int(atom1.index), int(atom2.index), ], ) except AttributeError as e: raise AttributeError( "topology_object fails to implement the" "chains() -> residue() -> atoms() and bond() protocol. " "Specifically, we encountered the following %s" % e, ) # actually set the tables try: self._remove_node(where="/", name="topology") except self.tables.NoSuchNodeError: pass data = json.dumps(topology_dict) if not isinstance(data, bytes): data = data.encode("ascii") self._handle.create_array(where="/", name="topology", obj=[data]) ##################################################### # randomState global attribute (optional) ##################################################### @property def randomState(self): "State of the creators internal random number generator at the start of the simulation" if hasattr(self._handle.root._v_attrs, "randomState"): return str(self._handle.root._v_attrs.randomState) return None @randomState.setter def randomState(self, value): "Set the state of the creators internal random number generator at the start of the simulation" _check_mode(self.mode, ("w", "a")) self._handle.root._v_attrs.randomState = str(value) ##################################################### # forcefield global attribute (optional) ##################################################### @property def forcefield(self): "Description of the hamiltonian used. A short, human readable string, like AMBER99sbildn." if hasattr(self._handle.root._v_attrs, "forcefield"): return str(self._handle.root._v_attrs.forcefield) return None @forcefield.setter def forcefield(self, value): "Set the description of the hamiltonian used. A short, human readable string, like AMBER99sbildn." _check_mode(self.mode, ("w", "a")) self._handle.root._v_attrs.forcefield = str(value) ##################################################### # reference global attribute (optional) ##################################################### @property def reference(self): "A published reference that documents the program or parameters used to generate the data" if hasattr(self._handle.root._v_attrs, "reference"): return str(self._handle.root._v_attrs.reference) return None @reference.setter def reference(self, value): "Set a published reference that documents the program or parameters used to generate the data" _check_mode(self.mode, ("w", "a")) self._handle.root._v_attrs.reference = str(value) ##################################################### # constraints array (optional) ##################################################### @property def constraints(self): """Constraints applied to the bond lengths Returns ------- constraints : {None, np.array, dtype=[('atom1', '<i4'), ('atom2', '<i4'), ('distance', '<f4')])} A one dimensional array with the a int, int, float type giving the index of the two atoms involved in the constraints and the distance of the constraint. If no constraint information is in the file, the return value is None. """ if hasattr(self._handle.root, "constraints"): return self._handle.root.constraints[:] return None @constraints.setter def constraints(self, value): """Set the constraints applied to bond lengths Returns ------- valu : np.array, dtype=[('atom1', '<i4'), ('atom2', '<i4'), ('distance', '<f4')]) A one dimensional array with the a int, int, float type giving the index of the two atoms involved in the constraints and the distance of the constraint. """ _check_mode(self.mode, ("w", "a")) dtype = np.dtype( [ ("atom1", np.int32), ("atom2", np.int32), ("distance", np.float32), ], ) if not value.dtype == dtype: raise ValueError( "Constraints must be an array with dtype=%s. " "currently, I don't do any casting" % dtype, ) if not hasattr(self._handle.root, "constraints"): self._create_table( where="/", name="constraints", description=dtype, ) self._handle.root.constraints.truncate(0) self._handle.root.constraints.append(value) ##################################################### # read/write methods for file-like behavior ##################################################### def read_as_traj(self, n_frames=None, stride=None, atom_indices=None): """Read a trajectory from the HDF5 file Parameters ---------- n_frames : {int, None} The number of frames to read. If not supplied, all of the remaining frames will be read. stride : {int, None} By default all of the frames will be read, but you can pass this flag to read a subset of of the data by grabbing only every `stride`-th frame from disk. atom_indices : {int, None} By default all of the atom will be read, but you can pass this flag to read only a subsets of the atoms for the `coordinates` and `velocities` fields. Note that you will have to carefully manage the indices and the offsets, since the `i`-th atom in the topology will not necessarily correspond to the `i`-th atom in your subset. Returns ------- trajectory : Trajectory A trajectory object containing the loaded portion of the file. """ _check_mode(self.mode, ("r",)) from mdtraj.core.trajectory import Trajectory topology = self.topology if atom_indices is not None: topology = topology.subset(atom_indices) data = self.read(n_frames=n_frames, stride=stride, atom_indices=atom_indices) if len(data) == 0: return Trajectory(xyz=np.zeros((0, topology.n_atoms, 3)), topology=topology) in_units_of(data.coordinates, self.distance_unit, Trajectory._distance_unit, inplace=True) in_units_of(data.cell_lengths, self.distance_unit, Trajectory._distance_unit, inplace=True) return Trajectory( xyz=data.coordinates, topology=topology, time=data.time, unitcell_lengths=data.cell_lengths, unitcell_angles=data.cell_angles, ) def read(self, n_frames=None, stride=None, atom_indices=None): """Read one or more frames of data from the file Parameters ---------- n_frames : {int, None} The number of frames to read. If not supplied, all of the remaining frames will be read. stride : {int, None} By default all of the frames will be read, but you can pass this flag to read a subset of of the data by grabbing only every `stride`-th frame from disk. atom_indices : {int, None} By default all of the atom will be read, but you can pass this flag to read only a subsets of the atoms for the `coordinates` and `velocities` fields. Note that you will have to carefully manage the indices and the offsets, since the `i`-th atom in the topology will not necessarily correspond to the `i`-th atom in your subset. Notes ----- If you'd like more flexible access to the data, that is available by using the pytables group directly, which is accessible via the `root` property on this class. Returns ------- frames : namedtuple The returned namedtuple will have the fields "coordinates", "time", "cell_lengths", "cell_angles", "velocities", "kineticEnergy", "potentialEnergy", "temperature" and "alchemicalLambda". Each of the fields in the returned namedtuple will either be a numpy array or None, dependening on if that data was saved in the trajectory. All of the data shall be n units of "nanometers", "picoseconds", "kelvin", "degrees" and "kilojoules_per_mole". """ _check_mode(self.mode, ("r",)) if n_frames is None: n_frames = np.inf if stride is not None: stride = int(stride) total_n_frames = len(self._handle.root.coordinates) frame_slice = slice(self._frame_index, min(self._frame_index + n_frames, total_n_frames), stride) if frame_slice.stop - frame_slice.start == 0: return [] if atom_indices is None: # get all of the atoms atom_slice = slice(None) else: atom_slice = ensure_type( atom_indices, dtype=int, ndim=1, name="atom_indices", warn_on_cast=False, ) if not np.all(atom_slice < self._handle.root.coordinates.shape[1]): raise ValueError( "As a zero-based index, the entries in " "atom_indices must all be less than the number of atoms " "in the trajectory, %d" % self._handle.root.coordinates.shape[1], ) if not np.all(atom_slice >= 0): raise ValueError( "The entries in atom_indices must be greater " "than or equal to zero", ) def get_field(name, slice, out_units, can_be_none=True): try: node = self._get_node(where="/", name=name) data = node.__getitem__(slice) in_units = node.attrs.units if not isinstance(in_units, str): in_units = in_units.decode() data = in_units_of(data, in_units, out_units) return data except self.tables.NoSuchNodeError: if can_be_none: return None raise frames = Frames( coordinates=get_field( "coordinates", (frame_slice, atom_slice, slice(None)), out_units="nanometers", can_be_none=False, ), time=get_field("time", frame_slice, out_units="picoseconds"), cell_lengths=get_field("cell_lengths", (frame_slice, slice(None)), out_units="nanometers"), cell_angles=get_field("cell_angles", (frame_slice, slice(None)), out_units="degrees"), velocities=get_field( "velocities", (frame_slice, atom_slice, slice(None)), out_units="nanometers/picosecond", ), kineticEnergy=get_field("kineticEnergy", frame_slice, out_units="kilojoules_per_mole"), potentialEnergy=get_field("potentialEnergy", frame_slice, out_units="kilojoules_per_mole"), temperature=get_field("temperature", frame_slice, out_units="kelvin"), alchemicalLambda=get_field("lambda", frame_slice, out_units="dimensionless"), ) self._frame_index += frame_slice.stop - frame_slice.start return frames def write( self, coordinates, time=None, cell_lengths=None, cell_angles=None, velocities=None, kineticEnergy=None, potentialEnergy=None, temperature=None, alchemicalLambda=None, ): """Write one or more frames of data to the file This method saves data that is associated with one or more simulation frames. Note that all of the arguments can either be raw numpy arrays or unitted arrays (with openmm.unit.Quantity). If the arrays are unittted, a unit conversion will be automatically done from the supplied units into the proper units for saving on disk. You won't have to worry about it. Furthermore, if you wish to save a single frame of simulation data, you can do so naturally, for instance by supplying a 2d array for the coordinates and a single float for the time. This "shape deficiency" will be recognized, and handled appropriately. Parameters ---------- coordinates : np.ndarray, shape=(n_frames, n_atoms, 3) The cartesian coordinates of the atoms to write. By convention, the lengths should be in units of nanometers. time : np.ndarray, shape=(n_frames,), optional You may optionally specify the simulation time, in picoseconds corresponding to each frame. cell_lengths : np.ndarray, shape=(n_frames, 3), dtype=float32, optional You may optionally specify the unitcell lengths. The length of the periodic box in each frame, in each direction, `a`, `b`, `c`. By convention the lengths should be in units of angstroms. cell_angles : np.ndarray, shape=(n_frames, 3), dtype=float32, optional You may optionally specify the unitcell angles in each frame. Organized analogously to cell_lengths. Gives the alpha, beta and gamma angles respectively. By convention, the angles should be in units of degrees. velocities : np.ndarray, shape=(n_frames, n_atoms, 3), optional You may optionally specify the cartesian components of the velocity for each atom in each frame. By convention, the velocities should be in units of nanometers / picosecond. kineticEnergy : np.ndarray, shape=(n_frames,), optional You may optionally specify the kinetic energy in each frame. By convention the kinetic energies should b in units of kilojoules per mole. potentialEnergy : np.ndarray, shape=(n_frames,), optional You may optionally specify the potential energy in each frame. By convention the kinetic energies should b in units of kilojoules per mole. temperature : np.ndarray, shape=(n_frames,), optional You may optionally specify the temperature in each frame. By convention the temperatures should b in units of Kelvin. alchemicalLambda : np.ndarray, shape=(n_frames,), optional You may optionally specify the alchemical lambda in each frame. These have no units, but are generally between zero and one. """ _check_mode(self.mode, ("w", "a")) # these must be either both present or both absent. since # we're going to throw an error if one is present w/o the other, # lets do it now. if cell_lengths is None and cell_angles is not None: raise ValueError("cell_lengths were given, but no cell_angles") if cell_lengths is not None and cell_angles is None: raise ValueError("cell_angles were given, but no cell_lengths") # if the input arrays are openmm.unit.Quantities, convert them # into md units. Note that this acts as a no-op if the user doesn't # have openmm.unit installed (e.g. they didn't install OpenMM) coordinates = in_units_of(coordinates, None, "nanometers") time = in_units_of(time, None, "picoseconds") cell_lengths = in_units_of(cell_lengths, None, "nanometers") cell_angles = in_units_of(cell_angles, None, "degrees") velocities = in_units_of(velocities, None, "nanometers/picosecond") kineticEnergy = in_units_of(kineticEnergy, None, "kilojoules_per_mole") potentialEnergy = in_units_of(potentialEnergy, None, "kilojoules_per_mole") temperature = in_units_of(temperature, None, "kelvin") alchemicalLambda = in_units_of(alchemicalLambda, None, "dimensionless") # do typechecking and shapechecking on the arrays # this ensure_type method has a lot of options, but basically it lets # us validate most aspects of the array. Also, we can upconvert # on defficent ndim, which means that if the user sends in a single # frame of data (i.e. coordinates is shape=(n_atoms, 3)), we can # realize that. obviously the default mode is that they want to # write multiple frames at a time, so the coordinate shape is # (n_frames, n_atoms, 3) coordinates = ensure_type( coordinates, dtype=np.float32, ndim=3, name="coordinates", shape=(None, None, 3), can_be_none=False, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) ( n_frames, n_atoms, ) = coordinates.shape[0:2] time = ensure_type( time, dtype=np.float32, ndim=1, name="time", shape=(n_frames,), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) cell_lengths = ensure_type( cell_lengths, dtype=np.float32, ndim=2, name="cell_lengths", shape=(n_frames, 3), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) cell_angles = ensure_type( cell_angles, dtype=np.float32, ndim=2, name="cell_angles", shape=(n_frames, 3), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) velocities = ensure_type( velocities, dtype=np.float32, ndim=3, name="velocities", shape=(n_frames, n_atoms, 3), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) kineticEnergy = ensure_type( kineticEnergy, dtype=np.float32, ndim=1, name="kineticEnergy", shape=(n_frames,), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) potentialEnergy = ensure_type( potentialEnergy, dtype=np.float32, ndim=1, name="potentialEnergy", shape=(n_frames,), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) temperature = ensure_type( temperature, dtype=np.float32, ndim=1, name="temperature", shape=(n_frames,), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) alchemicalLambda = ensure_type( alchemicalLambda, dtype=np.float32, ndim=1, name="alchemicalLambda", shape=(n_frames,), can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True, ) # if this is our first call to write(), we need to create the headers # and the arrays in the underlying HDF5 file if self._needs_initialization: self._initialize_headers( n_atoms=n_atoms, set_coordinates=True, set_time=(time is not None), set_cell=(cell_lengths is not None or cell_angles is not None), set_velocities=(velocities is not None), set_kineticEnergy=(kineticEnergy is not None), set_potentialEnergy=(potentialEnergy is not None), set_temperature=(temperature is not None), set_alchemicalLambda=(alchemicalLambda is not None), ) self._needs_initialization = False # we need to check that that the entries that the user is trying # to save are actually fields in OUR file try: # try to get the nodes for all of the fields that we have # which are not None for name in [ "coordinates", "time", "cell_angles", "cell_lengths", "velocities", "kineticEnergy", "potentialEnergy", "temperature", ]: contents = locals()[name] if contents is not None: self._get_node(where="/", name=name).append(contents) if contents is None: # for each attribute that they're not saving, we want # to make sure the file doesn't explect it try: self._get_node(where="/", name=name) raise AssertionError() except self.tables.NoSuchNodeError: pass # lambda is different, since the name in the file is lambda # but the name in this python function is alchemicalLambda name = "lambda" if alchemicalLambda is not None: self._get_node(where="/", name=name).append(alchemicalLambda) else: try: self._get_node(where="/", name=name) raise AssertionError() except self.tables.NoSuchNodeError: pass except self.tables.NoSuchNodeError: raise ValueError( "The file that you're trying to save to doesn't " f"contain the field {name}. You can always save a new trajectory " "and have it contain this information, but I don't allow 'ragged' " f"arrays. If one frame is going to have {name} information, then I expect " "all of them to. So I can't save it for just these frames. Sorry " "about that :)", ) except AssertionError: raise ValueError( "The file that you're saving to expects each frame " f"to contain {name} information, but you did not supply it." "I don't allow 'ragged' arrays. If one frame is going " f"to have {name} information, then I expect all of them to. ", ) self._frame_index += n_frames self.flush() def _initialize_headers( self, n_atoms, set_coordinates, set_time, set_cell, set_velocities, set_kineticEnergy, set_potentialEnergy, set_temperature, set_alchemicalLambda, ): self._n_atoms = n_atoms self._handle.root._v_attrs.conventions = "Pande" self._handle.root._v_attrs.conventionVersion = "1.1" self._handle.root._v_attrs.program = "MDTraj" self._handle.root._v_attrs.programVersion = mdtraj.__version__ self._handle.root._v_attrs.title = "title" # if the client has not the title attribute themselves, we'll # set it to MDTraj as a default option. if not hasattr(self._handle.root._v_attrs, "application"): self._handle.root._v_attrs.application = "MDTraj" # create arrays that store frame level informat if set_coordinates: self._create_earray( where="/", name="coordinates", atom=self.tables.Float32Atom(), shape=(0, self._n_atoms, 3), ) self._handle.root.coordinates.attrs["units"] = "nanometers" if set_time: self._create_earray( where="/", name="time", atom=self.tables.Float32Atom(), shape=(0,), ) self._handle.root.time.attrs["units"] = "picoseconds" if set_cell: self._create_earray( where="/", name="cell_lengths", atom=self.tables.Float32Atom(), shape=(0, 3), ) self._create_earray( where="/", name="cell_angles", atom=self.tables.Float32Atom(), shape=(0, 3), ) self._handle.root.cell_lengths.attrs["units"] = "nanometers" self._handle.root.cell_angles.attrs["units"] = "degrees" if set_velocities: self._create_earray( where="/", name="velocities", atom=self.tables.Float32Atom(), shape=(0, self._n_atoms, 3), ) self._handle.root.velocities.attrs["units"] = "nanometers/picosecond" if set_kineticEnergy: self._create_earray( where="/", name="kineticEnergy", atom=self.tables.Float32Atom(), shape=(0,), ) self._handle.root.kineticEnergy.attrs["units"] = "kilojoules_per_mole" if set_potentialEnergy: self._create_earray( where="/", name="potentialEnergy", atom=self.tables.Float32Atom(), shape=(0,), ) self._handle.root.potentialEnergy.attrs["units"] = "kilojoules_per_mole" if set_temperature: self._create_earray( where="/", name="temperature", atom=self.tables.Float32Atom(), shape=(0,), ) self._handle.root.temperature.attrs["units"] = "kelvin" if set_alchemicalLambda: self._create_earray( where="/", name="lambda", atom=self.tables.Float32Atom(), shape=(0,), ) self._get_node("/", name="lambda").attrs["units"] = "dimensionless" def seek(self, offset, whence=0): """Move to a new file position Parameters ---------- offset : int A number of frames. whence : {0, 1, 2} 0: offset from start of file, offset should be >=0. 1: move relative to the current position, positive or negative 2: move relative to the end of file, offset should be <= 0. Seeking beyond the end of a file is not supported """ _check_mode(self.mode, ("r",)) if whence == 0 and offset >= 0: self._frame_index = offset elif whence == 1: self._frame_index = self._frame_index + offset elif whence == 2 and offset <= 0: self._frame_index = len(self._handle.root.coordinates) + offset else: raise OSError("Invalid argument") def tell(self): """Current file position Returns ------- offset : int The current frame in the file. """ return int(self._frame_index) def _validate(self): raise NotImplementedError() # check that all of the shapes are consistent # check that everything has units @property def _get_node(self): return self._handle.get_node @property def _create_earray(self): return self._handle.create_earray @property def _create_table(self): return self._handle.create_table @property def _remove_node(self): return self._handle.remove_node @property def _open_file(self): return self.tables.open_file def close(self): "Close the HDF5 file handle" if self._open: self._handle.close() self._open = False def flush(self): "Write all buffered data in the to the disk file." if self._open: self._handle.flush() def __del__(self): self.close() def __enter__(self): "Support the context manager protocol" return self def __exit__(self, *exc_info): "Support the context manager protocol" self.close() def __len__(self): "Number of frames in the file" if not self._open: raise ValueError("I/O operation on closed file") return len(self._handle.root.coordinates)
def _check_mode(m, modes): if m not in modes: raise ValueError( "This operation is only available when a file " 'is open in mode="%s".' % m, )