# -*- coding: utf-8 -*- import abc import numpy as np from copy import copy, deepcopy from inspect import ismethod # from datetime import datetime # from math import pi,sqrt,exp,floor # from scipy.special import erf # import scipy.optimize as opt import os, sys, subprocess ## Abstract classes class Transformable(): def __init__(self, position, orientation=None): self.position = np.array(position) if orientation is not None: orientation = np.array(orientation) self.orientation = orientation def transform(self, R = ((1,0,0),(0,1,0),(0,0,1)), center = (0,0,0), offset = (0,0,0)): R,center,offset = [np.array(x) for x in (R,center,offset)] self.position = R.dot(self.position-center)+center+offset if self.orientation is not None: ## TODO: what if self.orientation is taken from parent?! self.orientation = self.orientation.dot(R) ... def collapsedPosition(self): # print("collapsedPosition called", type(self), self.name) if isinstance(self, Child): # print(self.parent, isinstance(self.parent,Transformable)) if isinstance(self.parent, Transformable): return self.applyOrientation(self.position) + self.parent.collapsedPosition() # if self.parent.orientation is not None: # return self.parent.collapsedOrientation().dot(self.position) + self.parent.collapsedPosition() return np.array(self.position) # return a copy def applyOrientation(self,obj): # print("applyOrientation called", self.name, obj) if isinstance(self, Child): # print( self.orientation, self.orientation is not None, None is not None ) # if self.orientation is not None: # # print("applyOrientation applying", self, self.name, self.orientation) # obj = self.orientation.dot(obj) if isinstance(self.parent, Transformable): if self.parent.orientation is not None: obj = self.parent.orientation.dot(obj) obj = self.parent.applyOrientation(obj) # print("applyOrientation returning", self.name, obj) return obj class Parent(): def __init__(self, children=[]): self.children = [] for x in children: self.add(x) # self.children = children self.bonds = [] self.angles = [] self.dihedrals = [] self.exclusions = [] ## TODO: self.cacheInvalid = True # What will be in the cache? def add(self,x): ## TODO: check the parent-child tree to make sure there are no cycles if not isinstance(x,Child): raise Exception('Attempted to add an object to a group that does not inherit from the "Child" type') if x.parent is not None and x.parent is not self: raise Exception("Child {} already belongs to some group".format(x)) x.parent = self self.children.append(x) def remove(self,x): if x in self.children: self.children.remove(x) x.parent = None def add_bond(self, i,j, bond, exclude=False): ## TODO: how to handle duplicating and cloning bonds beads = [b for b in self] for b in (i,j): assert(b in beads) self.bonds.append( (i,j, bond, exclude) ) def add_angle(self, i,j,k, angle): beads = [b for b in self] for b in (i,j,k): assert(b in beads) self.angles.append( (i,j,k, angle) ) def add_dihedral(self, i,j,k,l, dihedral): beads = [b for b in self] for b in (i,j,k,l): assert(b in beads) self.dihedrals.append( (i,j,k,l, dihedral) ) def add_exclusion(self, i,j): ## TODO: how to handle duplicating and cloning bonds beads = [b for b in self] for b in (i,j): assert(b in beads) self.exclusions.append( (i,j) ) def get_bonds(self): ret = self.bonds for c in self.children: if isinstance(c,Parent): ret.extend( c.get_bonds() ) return ret def get_angles(self): ret = self.angles for c in self.children: if isinstance(c,Parent): ret.extend( c.get_angles() ) return ret def get_dihedrals(self): ret = self.dihedrals for c in self.children: if isinstance(c,Parent): ret.extend( c.get_dihedrals() ) return ret def get_exclusions(self): ret = self.exclusions for c in self.children: if isinstance(c,Parent): ret.extend( c.get_exclusions() ) return ret def __iter__(self): ## TODO: decide if this is the nicest way to do it! """Depth-first iteration through tree""" for x in self.children: if isinstance(x,Parent): if isinstance(x,Clone) and not isinstance(x.get_original_recursively(),Parent): yield x else: for y in x: yield y else: yield x def __len__(self): l = 0 for x in self.children: if isinstance(x,Parent): l += len(x) else: l += 1 return l def __getitem__(self, i): return self.children[i] def __setitem__(self, i, val): x = self.children[i] x.parent = None val.parent = self self.children[i] = val class Child(): def __init__(self, parent=None): self.parent = parent def __getattr__(self, name): """ Try to get attribute from the parent """ # if self.parent is not None: if "parent" not in self.__dict__ or self.__dict__["parent"] is None: raise AttributeError("'{}' object has no attribute '{}'".format(type(self).__name__, name)) # if ismethod(ret): # raise AttributeError("Is Method!") return getattr(self.parent,name) # def __getstate__(self): # print("Child getstate called", self) # print(self.__dict__) # return (self.__dict__,) # def __setstate__(self, state): # self.__dict__, = state class Clone(Transformable, Parent, Child): def __init__(self, original, parent=None, position = None, orientation = None): if position is None and original.position is not None: position = np.array( original.position ) if orientation is None and original.orientation is not None: orientation = np.array( original.orientation ) if parent is None: parent = original.parent self.original = original Child.__init__(self, parent) Transformable.__init__(self, position, orientation) ## TODO: keep own bond_list, etc, update when needed original changes if "children" in original.__dict__ and len(original.children) > 0: self.children = [Clone(c, parent = self) for c in original.children] else: self.children = [] def get_original_recursively(self): if isinstance(self.original, Clone): return self.original.get_original_recursively() else: return self.original def __getattr__(self, name): """ Try to get attribute from the original without descending the tree heirarchy, then look up parent TODO: handle PointParticle lookups into ParticleType """ # print("Clone getattr",name) if name in self.original.__dict__: return self.original.__dict__[name] else: if "parent" not in self.__dict__ or self.__dict__["parent"] is None: raise AttributeError("'{}' object has no attribute '{}'".format(type(self).__name__, name)) return getattr(self.parent, name) ## Particle classes class ParticleType(): """Class that hold common attributes that particles can point to""" excludedAttributes = ("idx","type_", "position", "parent", "excludedAttributes", ) def __init__(self, name, charge=0, **kargs): # parent = None # if "parent" in kargs: parent = kargs["parent"] # Child.__init__(self, parent) self.name = name self.charge = charge for key in ParticleType.excludedAttributes: assert( key not in kargs ) for key,val in kargs.items(): self.__dict__[key] = val def __hash_key(self): l = [self.name,self.charge] for keyval in sorted(self.__dict__.items()): l.extend(keyval) return tuple(l) def __hash__(self): return hash(self.__hash_key()) def _equal_check(a,b): if a.name == b.name: if a.__hash_key() != b.__hash_key(): raise Exception("Badly formed ParticleTypes have 'name' collision") def __eq__(a,b): a._equal_check(b) return a.name == b.name def __lt__(a,b): a._equal_check(b) return a.name < b.name def __le__(a,b): a._equal_check(b) return a.name <= b.name def __gt__(a,b): a._equal_check(b) return a.name > b.name def __ge__(a,b): a._equal_check(b) return a.name >= b.name class PointParticle(Transformable, Child): def __init__(self, type_, position, name="A", segname="A", **kwargs): Child.__init__(self, parent=None) Transformable.__init__(self,position) self.type_ = type_ self.idx = None self.segname = segname self.name = name self.counter = 0 for key,val in kwargs.items(): self.__dict__[key] = val def __getattr__(self, name): """ First try to get attribute from the parent, then type_ Note that this data structure seems to be fragile, can result in stack overflow """ # return Child.__getattr__(self,name) try: return Child.__getattr__(self,name) except Exception as e: if 'type_' in self.__dict__: return getattr(self.type_, name) else: raise AttributeError("'{}' object has no attribute '{}'".format(type(self).__name__, name)) class Group(Transformable, Parent, Child): def __init__(self, name=None, children = [], parent=None, position = np.array((0,0,0)), orientation = np.array(((1,0,0),(0,1,0),(0,0,1))), ): Transformable.__init__(self, position, orientation) Child.__init__(self, parent) # Initialize Child first Parent.__init__(self, children) self.name = name self.isClone = False def clone(self): return Clone(self) g = copy(self) g.isClone = True # TODO: use? g.children = [copy(c) for c in g.children] for c in g.children: c.parent = g return g g = Group(position = self.position, orientation = self.orientation) g.children = self.children # lists point to the same object def duplicate(self): return deepcopy(self) # Group(position = self.position, # orientation = self.orientation) # g.children = deepcopy self.children.deepcopy() # lists are the same object # def __getstate__(self): # return (self.children, self.parent, self.position, self.orientation) # def __setstate__(self, state): # self.children, self.parent, self.position, self.orientation = state class PdbModel(Transformable, Parent): def __init__(self, children=[], dimensions=None): Transformable.__init__(self,(0,0,0)) Parent.__init__(self, children) self.dimensions = dimensions self.particles = [p for p in self] self.cacheInvalid = True def _updateParticleOrder(self): pass def writePdb(self, filename): if self.cacheInvalid: self._updateParticleOrder() with open(filename,'w') as fh: ## Write header fh.write("CRYST1 {:>5f} {:>5f} {:>5f} 90.00 90.00 90.00 P 1 1\n".format( *self.dimensions )) ## Write coordinates formatString = "ATOM {:>5d} {:^4s}{:1s}{:3s} {:1s}{:>5s} {:8.3f}{:8.3f}{:8.3f}{:6.2f}{:6.2f}{:2s}{:2f}\n" for p in self.particles: ## http://www.wwpdb.org/documentation/file-format-content/format33/sect9.html#ATOM idx = p.idx+1 name = p.type_.name resname = name[:3] chain = "A" charge = 0 occ = 0 beta = 0 x,y,z = [x for x in p.collapsedPosition()] assert(idx < 1e5) resid = "{:<4d}".format(idx) fh.write( formatString.format( idx, name[:1], "", resname, chain, resid, x, y, z, occ, beta, "", charge )) return def writePsf(self, filename): if self.cacheUpToDate == False: self._updateParticleOrder() with open(filename,'w') as fh: ## Write header fh.write("PSF NAMD\n\n") # create NAMD formatted psf ## ATOMS section fh.write("{:>8d} !NATOM\n".format(len(self.particles))) ## From vmd/plugins/molfile_plugin/src/psfplugin.c ## "%d %7s %10s %7s %7s %7s %f %f" formatString = "{idx:>8d} {segname:7s} {resid:<10s} {resname:7s}" + \ " {name:7s} {type:7s} {charge:f} {mass:f}\n" for p in self.particles: idx = p.idx + 1 data = dict( idx = idx, segname = "A", resid = "%d%c%c" % (idx," "," "), # TODO: work with large indeces name = p.type_.name[:1], resname = p.type_.name[:3], type = p.type_.name[:1], charge = p.charge, mass = p.mass ) fh.write(formatString.format( **data )) fh.write("\n") ## Write out bonds bonds = self.get_bonds() fh.write("{:>8d} !NBOND\n".format(len(bonds))) counter = 0 for p1,p2,b,ex in bonds: fh.write( "{:d} {:d} ".format(p1.idx+1,p2.idx+1) ) counter += 1 if counter == 3: fh.write("\n") counter = 0 fh.write("\n") ## TODO: Write out angles return class ArbdModel(PdbModel): def __init__(self, children, dimensions=(1000,1000,1000), temperature=291, timestep=50e-6, cutoff=50, decompPeriod=10000, pairlistDistance=None, nonbondedResolution=0.1): PdbModel.__init__(self, children, dimensions) self.temperature = temperature self.timestep = timestep self.cutoff = cutoff if pairlistDistance == None: pairlistDistance = cutoff+2 self.decompPeriod = decompPeriod self.pairlistDistance = pairlistDistance self.numParticles = 0 self.particles = [] self.type_counts = None self.nbSchemes = [] self._nbParamFiles = [] # This could be made more robust self.nbResolution = 0.1 self._written_bond_files = dict() self.cacheUpToDate = False def _getNbScheme(self, typeA, typeB): scheme = None for s,A,B in self.nbSchemes: if A is None or B is None: if A is None and B is None: return s elif A is None and typeB == B: return s elif B is None and typeA == A: return s elif typeA == A and typeB == B: return s raise Exception("No nonbonded scheme found for %s and %s" % (typeA.name, typeB.name)) def _countParticleTypes(self): ## TODO: check for modifications to particle that require ## automatic generation of new particle type type_counts = dict() for p in self: t = p.type_ if t in type_counts: type_counts[t] += 1 else: type_counts[t] = 1 self.type_counts = type_counts def _updateParticleOrder(self): ## Create ordered list particles = [p for p in self] self.particles = sorted(particles, key=lambda p: (p.type_, p.idx)) ## Update particle indices for p,i in zip(self.particles,range(len(self.particles))): p.idx = i # self.initialCoords = np.array([p.initialPosition for p in self.particles]) def useNonbondedScheme(self, nbScheme, typeA=None, typeB=None): self.nbSchemes.append( (nbScheme, typeA, typeB) ) if typeA != typeB: self.nbSchemes.append( (nbScheme, typeB, typeA) ) def simulate(self, outputPrefix, outputDirectory='output', numSteps=100000000, timestep=100e-6, gpu=0, outputPeriod=1e4, arbd=None): assert(type(gpu) is int) numSteps = int(numSteps) if self.cacheUpToDate == False: self._countParticleTypes() self._updateParticleOrder() if outputDirectory == '': outputDirectory='.' if arbd is None: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') fname = os.path.join(path, "arbd") if os.path.isfile(fname) and os.access(fname, os.X_OK): arbd = fname break if arbd is None: raise Exception("ARBD was not found") if not os.path.exists(arbd): raise Exception("ARBD was not found") if not os.path.isfile(arbd): raise Exception("ARBD was not found") if not os.access(arbd, os.X_OK): raise Exception("ARBD is not executable") if not os.path.exists(outputDirectory): os.makedirs(outputDirectory) elif not os.path.isdir(outputDirectory): raise Exception("outputDirectory '%s' is not a directory!" % outputDirectory) self.writePdb( outputPrefix + ".pdb" ) self.writePsf( outputPrefix + ".psf" ) self.writeArbdFiles( outputPrefix, numSteps=numSteps, outputPeriod=outputPeriod ) ## http://stackoverflow.com/questions/18421757/live-output-from-subprocess-command cmd = (arbd, '-g', "%d" % gpu, "%s.bd" % outputPrefix, "%s/%s" % (outputDirectory, outputPrefix)) cmd = tuple(str(x) for x in cmd) print("Running ARBD with: %s" % " ".join(cmd)) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) for line in process.stdout: sys.stdout.write(line) sys.stdout.flush() # -------------------------- # # Methods for printing model # # -------------------------- # def writeArbdFiles(self, prefix, numSteps=100000000, outputPeriod=10000): ## TODO: save and reference directories and prefixes using member data d = "potentials" self._bond_filename = "%s/%s.bonds.txt" % (d, prefix) self._angle_filename = "%s/%s.angles.txt" % (d, prefix) self._dihedral_filename = "%s/%s.dihedrals.txt" % (d, prefix) self._exclusion_filename = "%s/%s.exculsions.txt" % (d, prefix) self._writeArbdCoordFile( prefix + ".coord.txt" ) self._writeArbdBondFile() self._writeArbdAngleFile() self._writeArbdDihedralFile() self._writeArbdExclusionFile() self._writeArbdPotentialFiles( prefix, directory = d ) self._writeArbdConf( prefix, numSteps=numSteps, outputPeriod=outputPeriod ) def _writeArbdCoordFile(self, filename): with open(filename,'w') as fh: for p in self.particles: fh.write("%f %f %f\n" % tuple(x for x in p.collapsedPosition())) def _writeArbdConf(self, prefix, randomSeed=None, numSteps=100000000, outputPeriod=10000, restartCoordinateFile=None): ## TODO: raise exception if _writeArbdPotentialFiles has not been called filename = "%s.bd" % prefix ## Prepare a dictionary to fill in placeholders in the configuration file params = self.__dict__.copy() # get parameters from System object if randomSeed is None: params['randomSeed'] = "" else: params['randomSeed'] = "seed %s" % randomSeed params['numSteps'] = int(numSteps) params['coordinateFile'] = "%s.coord.txt" % prefix if restartCoordinateFile is None: params['restartCoordinates'] = "" else: params['restartCoordinates'] = "restartCoordinates %s" % restartCoordinateFile params['outputPeriod'] = outputPeriod for k,v in zip('XYZ', self.dimensions): params['origin'+k] = -v*0.5 params['dim'+k] = v ## Actually write the file with open(filename,'w') as fh: fh.write("""{randomSeed} timestep {timestep} steps {numSteps} numberFluct 0 # deprecated interparticleForce 1 # other values deprecated fullLongRange 0 # deprecated temperature {temperature} outputPeriod {outputPeriod} ## Energy doesn't actually get printed! outputEnergyPeriod {outputPeriod} outputFormat dcd ## Infrequent domain decomposition because this kernel is still very slow decompPeriod {decompPeriod} cutoff {cutoff} pairlistDistance {pairlistDistance} origin {originX} {originY} {originZ} systemSize {dimX} {dimY} {dimZ} \n""".format(**params)) ## Write entries for each type of particle for pt,num in self.getParticleTypesAndCounts(): particleParams = pt.__dict__.copy() particleParams['num'] = num fh.write(""" particle {name} num {num} diffusion {diffusivity} """.format(**particleParams)) if 'grid' in particleParams: for g in pt['grid']: fh.write("gridFile {}\n".format(g)) else: fh.write("gridFile null.dx\n") ## Write coordinates and interactions fh.write(""" ## Input coordinates inputCoordinates {coordinateFile} {restartCoordinates} ## Interaction potentials tabulatedPotential 1 ## The i@j@file syntax means particle type i will have NB interactions with particle type j using the potential in file """.format(**params)) for pair,f in zip(self._particleTypePairIter(), self._nbParamFiles): i,j,t1,t2 = pair fh.write("tabulatedFile %d@%d@%s\n" % (i,j,f)) ## Bonded interactions bonds = self.get_bonds() angles = self.get_angles() dihedrals = self.get_dihedrals() exclusions = self.get_exclusions() if len(bonds) > 0: for b in list(set([b for i,j,b,ex in bonds])): fh.write("tabulatedBondFile %s\n" % b) if len(angles) > 0: for b in list(set([b for i,j,k,b in angles])): fh.write("tabulatedAngleFile %s\n" % b) if len(dihedrals) > 0: for b in list(set([b for i,j,k,l,b in dihedrals])): fh.write("tabulatedDihedralFile %s\n" % b) fh.write("inputBonds %s\n" % self._bond_filename) fh.write("inputAngles %s\n" % self._angle_filename) fh.write("inputDihedrals %s\n" % self._dihedral_filename) fh.write("inputExcludes %s\n" % self._exclusion_filename) write_null_dx = False for pt,num in self.getParticleTypesAndCounts(): if "grid" not in pt.__dict__: with open("null.dx",'w') as fh: fh.write("""object 1 class gridpositions counts 2 2 2 origin {originX} {originY} {originZ} delta {dimX} 0.000000 0.000000 delta 0.000000 {dimY} 0.000000 delta 0.000000 0.000000 {dimZ} object 2 class gridconnections counts 2 2 2 object 3 class array type float rank 0 items 8 data follows 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 attribute "dep" string "positions" object "density" class field component "positions" value 1 component "connections" value 2 component "data" value 3 """.format(**params)) break def getParticleTypesAndCounts(self): ## TODO: remove(?) return sorted( self.type_counts.items(), key=lambda x: x[0] ) def _particleTypePairIter(self): typesAndCounts = self.getParticleTypesAndCounts() for i in range(len(typesAndCounts)): t1 = typesAndCounts[i][0] for j in range(i,len(typesAndCounts)): t2 = typesAndCounts[j][0] yield( (i,j,t1,t2) ) def _writeArbdPotentialFiles(self, prefix, directory = "potentials"): try: os.makedirs(directory) except OSError: if not os.path.isdir(directory): raise pathPrefix = "%s/%s" % (directory,prefix) self._writeNonbondedParameterFiles( pathPrefix + "-nb" ) # self._writeBondParameterFiles( pathPrefix ) # self._writeAngleParameterFiles( pathPrefix ) # self._writeDihedralParameterFiles( pathPrefix ) def _writeNonbondedParameterFiles(self, prefix): x = np.arange(0, self.cutoff, self.nbResolution) for i,j,t1,t2 in self._particleTypePairIter(): f = "%s.%s-%s.dat" % (prefix, t1.name, t2.name) scheme = self._getNbScheme(t1,t2) scheme.write_file(f, t1, t2, rMax = self.cutoff) self._nbParamFiles.append(f) def _getNonbondedPotential(self,x,a,b): return a*(np.exp(-x/b)) def _writeArbdBondFile( self ): for b in list( set( [b for i,j,b,ex in self.get_bonds()] ) ): if type(b) is not str: b.write_file() with open(self._bond_filename,'w') as fh: for i,j,b,ex in self.get_bonds(): item = (i.idx, j.idx, str(b)) if ex: fh.write("BOND REPLACE %d %d %s\n" % item) else: fh.write("BOND ADD %d %d %s\n" % item) def _writeArbdAngleFile( self ): for b in list( set( [b for i,j,k,b in self.get_angles()] ) ): if type(b) is not str: b.write_file() with open(self._angle_filename,'w') as fh: for b in self.get_angles(): item = tuple([p.idx for p in b[:-1]] + [str(b[-1])]) fh.write("ANGLE %d %d %d %s\n" % item) def _writeArbdDihedralFile( self ): for b in list( set( [b for i,j,k,l,b in self.get_dihedrals()] ) ): if type(b) is not str: b.write_file() with open(self._dihedral_filename,'w') as fh: for b in self.get_dihedrals(): item = tuple([p.idx for p in b[:-1]] + [str(b[-1])]) fh.write("DIHEDRAL %d %d %d %d %s\n" % item) def _writeArbdExclusionFile( self ): with open(self._exclusion_filename,'w') as fh: for ex in self.get_exclusions(): item = tuple(int(p.idx) for p in ex) fh.write("EXCLUDE %d %d\n" % item)