Commit fca5b912 authored by cmaffeo2's avatar cmaffeo2
Browse files

Added all-atom model

parent 4b50fbbc
......@@ -2,11 +2,14 @@ import json
import numpy as np
import copy
from coords import rotationAboutAxis
from arbdmodel import Group, PointParticle, ParticleType
seqComplement = dict(A='T',G='C')
for x,y in list(seqComplement.items()): seqComplement[y] = x
atomic_types = dict()
def stringToIntTuples(string, tupleLen, offset):
l = string.split()
# for i in range(0,len(l),tupleLen):
......@@ -16,42 +19,60 @@ def stringToIntTuples(string, tupleLen, offset):
return ret
class CanonicalNucleotide():
class CanonicalNucleotide(Group):
# DefaultOrientation = rotationAboutAxis([0,0,1], 68)
DefaultOrientation = rotationAboutAxis([0,0,1], 68+180)
def __init__(self, prefix, seq):
self.sequence = seq
Group.__init__(self, orientation = CanonicalNucleotide.DefaultOrientation)
## idx, type,
d = np.loadtxt("%s.dat" % prefix, dtype='i4,S4,S4,f4,f4,f4,f4,f4')
self.indices = d['f0']
self.props = dict()
for k1,k2 in zip(('charge','mass','x','y','z'),
('f3','f4','f5','f6','f7')):
self.props[k1] = d[k2]
for k1,k2 in zip(('name','type'),('f1','f2')):
self.props[k1] = [str(x,'utf-8') for x in d[k2]]
ids = d['f0']
assert( np.all(ids == sorted(ids)) )
names = d['f1']
tns = d['f2']
xs = d['f5']
ys = d['f6']
zs = d['f7']
type_list = []
for i in range(len(tns)):
tn = tns[i]
if tn not in atomic_types:
atomic_types[tn] = ParticleType(tn.decode('UTF-8'),
charge = d['f3'][i],
mass = d['f4'][i])
p = PointParticle( type_ = atomic_types[tn],
position = [xs[i],ys[i],zs[i]],
name = names[i].decode('UTF-8') )
self.children.append( p )
self.props['beta'] = np.ones(len(d['f1']))
# self.props['name'] = [str(x) for x in d['f1']]
# self.props['type'] = [str(x) for x in d['f2']]
minIdx = np.min(self.indices) - 1
b = np.loadtxt("%s.bonds.txt" % prefix, dtype='i4') - minIdx
self.bonds = np.array( [x for x in zip(b[::2],b[1::2])] )
atoms = self.children
self.atoms_by_name = {a.name:a for a in atoms}
a = np.loadtxt("%s.angles.txt" % prefix, dtype='i4') - minIdx
self.angles = np.array( [x for x in zip(a[::3],a[1::3],a[2::3])] )
# minIdx = np.min(self.indices) - 1
b = np.loadtxt("%s.bonds.txt" % prefix, dtype='i4') # - minIdx
for i,j in zip(b[::2],b[1::2]):
self.add_bond( atoms[i], atoms[j], None )
d = np.loadtxt("%s.dihedrals.txt" % prefix, dtype='i4') - minIdx
self.dihedrals = np.array( [x for x in zip(d[::4],d[1::4],d[2::4],d[3::4])] )
a = np.loadtxt("%s.angles.txt" % prefix, dtype='i4') # - minIdx
for i,j,k in zip(a[::3],a[1::3],a[2::3]):
self.add_angle( atoms[i], atoms[j], atoms[k], None )
i = np.loadtxt("%s.impropers.txt" % prefix, dtype='i4') - minIdx
self.impropers = np.array( [x for x in zip(i[::4],i[1::4],i[2::4],i[3::4])] )
self.indices = self.indices - minIdx
d = np.loadtxt("%s.dihedrals.txt" % prefix, dtype='i4')# - minIdx
for i,j,k,l in zip(d[::4],d[1::4],d[2::4],d[3::4]):
self.add_dihedral( atoms[i], atoms[j], atoms[k], atoms[l], None )
i = np.loadtxt("%s.impropers.txt" % prefix, dtype='i4')# - minIdx
for i,j,k,l in zip(d[::4],d[1::4],d[2::4],d[3::4]):
self.add_improper( atoms[i], atoms[j], atoms[k], atoms[l], None )
## TODO: remove this!
def transformed(self, orientation, translation, scale=0.5, singleStranded=False):
# rotationAboutZ -= 68
c = copy.copy(self)
......@@ -100,25 +121,6 @@ class CanonicalNucleotide():
return c
# def __iter__(self):
# for a in zip(self.props['name'], self.coords):
# yield a
def __len__(self):
return len(self.indices)
def index(self, name):
for i,n in zip(self.indices, self.props['name']):
if n == name: return i
raise Exception("No atomic index found for atom '%s'!" % name)
def atomicProps(self):
for i in range(len(self)):
tmp = dict()
for k,a in self.props.items():
tmp[k] = a[i]
yield tmp
canonicalNtFwd = dict()
direction = 'fwd'
for seq in seqComplement.keys():
......
......@@ -66,6 +66,7 @@ class Parent():
self.bonds = []
self.angles = []
self.dihedrals = []
self.impropers = []
self.exclusions = []
## TODO: self.cacheInvalid = True # What will be in the cache?
......@@ -89,6 +90,7 @@ class Parent():
self.bonds = []
self.angles = []
self.dihedrals = []
self.impropers = []
self.exclusions = []
def remove(self,x):
......@@ -112,6 +114,11 @@ class Parent():
for b in (i,j,k,l): assert(b in beads)
self.dihedrals.append( (i,j,k,l, dihedral) )
def add_improper(self, i,j,k,l, dihedral):
beads = [b for b in self]
for b in (i,j,k,l): assert(b in beads)
self.impropers.append( (i,j,k,l, dihedral) )
def add_exclusion(self, i,j):
## TODO: how to handle duplicating and cloning bonds
## TODO: perform following check elsewhere
......@@ -137,6 +144,12 @@ class Parent():
if isinstance(c,Parent): ret.extend( c.get_dihedrals() )
return ret
def get_impropers(self):
ret = self.impropers
for c in self.children:
if isinstance(c,Parent): ret.extend( c.get_impropers() )
return ret
def get_exclusions(self):
ret = self.exclusions
for c in self.children:
......@@ -188,9 +201,16 @@ class Child():
# if self.parent is not None:
if "parent" not in self.__dict__ or self.__dict__["parent"] is None or name is "children":
raise AttributeError("'{}' object has no attribute '{}'".format(type(self).__name__, name))
# if ismethod(ret):
# raise AttributeError("Is Method!")
return getattr(self.parent,name)
## TODO: determine if there is a way to avoid __getattr__ if a method is being looked up
try:
ret = getattr(self.parent,name)
except:
raise AttributeError("'{}' object has no attribute '{}'".format(type(self).__name__, name))
if ismethod(ret):
raise AttributeError("'{}' object has no method '{}'".format(type(self).__name__, name))
return ret
# def __getstate__(self):
# print("Child getstate called", self)
......@@ -355,7 +375,10 @@ class Group(Transformable, Parent, Child):
g.children = self.children # lists point to the same object
def duplicate(self):
return deepcopy(self)
new = deepcopy(self)
for c in new.children:
c.parent = new
return new
# Group(position = self.position,
# orientation = self.orientation)
# g.children = deepcopy self.children.deepcopy() # lists are the same object
......@@ -386,7 +409,7 @@ class PdbModel(Transformable, Parent):
self._updateParticleOrder()
with open(filename,'w') as fh:
## Write header
fh.write("CRYST1 {:>5f} {:>5f} {:>5f} 90.00 90.00 90.00 P 1 1\n".format( *self.dimensions ))
fh.write("CRYST1 {:>5f} {:>5f} {:>5f} 90.00 90.00 90.00 P 1 1\n".format( *self.dimensions ))
## Write coordinates
formatString = "ATOM {:>5d} {:^4s}{:1s}{:3s} {:1s}{:>5s} {:8.3f}{:8.3f}{:8.3f}{:6.2f}{:6.2f}{:2s}{:2f}\n"
......@@ -414,6 +437,7 @@ class PdbModel(Transformable, Parent):
with open(filename,'w') as fh:
## Write header
fh.write("PSF NAMD\n\n") # create NAMD formatted psf
fh.write("{:>8d} !NTITLE\n\n".format(0))
## ATOMS section
fh.write("{:>8d} !NATOM\n".format(len(self.particles)))
......@@ -449,9 +473,42 @@ class PdbModel(Transformable, Parent):
counter = 0
fh.write("\n")
## TODO: Write out angles
## Write out angles
angles = self.get_angles()
fh.write("{:>8d} !NTHETA\n".format(len(angles)))
counter = 0
for p1,p2,p3,a in angles:
fh.write( "{:d} {:d} {:d}".format(p1.idx+1,p2.idx+1,p3.idx+1) )
counter += 1
if counter == 2:
fh.write("\n")
counter = 0
fh.write("\n")
## Write out dihedrals
dihedrals = self.get_dihedrals()
fh.write("{:>8d} !NPHI\n".format(len(dihedrals)))
counter = 0
for p1,p2,p3,p4,a in dihedrals:
fh.write( "{:d} {:d} {:d} {:d}".format(p1.idx+1,p2.idx+1,p3.idx+1,p4.idx+1) )
counter += 1
if counter == 2:
fh.write("\n")
counter = 0
fh.write("\n")
## Write out impropers
impropers = self.get_impropers()
fh.write("{:>8d} !NIMPHI\n".format(len(impropers)))
counter = 0
for p1,p2,p3,p4,a in impropers:
fh.write( "{:d} {:d} {:d} {:d}".format(p1.idx+1,p2.idx+1,p3.idx+1,p4.idx+1) )
counter += 1
if counter == 2:
fh.write("\n")
counter = 0
fh.write("\n")
return
class ArbdModel(PdbModel):
def __init__(self, children, dimensions=(1000,1000,1000), temperature=291, timestep=50e-6, cutoff=50, decompPeriod=10000, pairlistDistance=None, nonbondedResolution=0.1):
......@@ -809,3 +866,12 @@ component "data" value 3
for ex in self.get_exclusions():
item = tuple(int(p.idx) for p in ex)
fh.write("EXCLUDE %d %d\n" % item)
def atomic_simulate(self, outputPrefix, outputDirectory='output'):
if self.cacheUpToDate == False: # TODO: remove cache?
self._countParticleTypes()
self._updateParticleOrder()
if outputDirectory == '': outputDirectory='.'
self.writePdb( outputPrefix + ".pdb" )
self.writePsf( outputPrefix + ".psf" )
......@@ -121,14 +121,12 @@ def quaternion_from_matrix( R ):
d1 = 1+R[0,0]+R[1,1]+R[2,2]
d2 = 1+R[0,0]-R[1,1]-R[2,2]
d2 = 1+R[0,0]-R[1,1]-R[2,2]
d2 = 1+R[0,0]-R[1,1]-R[2,2]
d3 = 1-R[0,0]+R[1,1]-R[2,2]
d4 = 1-R[0,0]-R[1,1]+R[2,2]
# minD = min((d1,d2,d3,d3))
maxD = max((d1,d2))
## TODO: add more approaches TypeError: 0 < idim < 11 must hold
maxD = max((d1,d2,d3,d4))
d = 0.5 / np.sqrt(maxD)
if d1 == maxD:
return np.array(( 1.0/(4*d),
d * (R[2,1]-R[1,2]),
......@@ -139,6 +137,16 @@ def quaternion_from_matrix( R ):
1.0/(4*d),
d * (R[0,1]+R[1,0]),
d * (R[0,2]+R[2,0]) ))
elif d3 == maxD:
return np.array(( d * (R[0,2]-R[2,0]),
d * (R[0,1]+R[1,0]),
1.0/(4*d),
d * (R[1,2]+R[2,1]) ))
elif d4 == maxD:
return np.array(( d * (R[1,0]-R[0,1]),
d * (R[0,2]+R[2,0]),
d * (R[1,2]+R[2,1]),
1.0/(4*d) ))
def rotationAboutAxis(axis,angle, normalizeAxis=True):
if normalizeAxis: axis = axis / np.linalg.norm(axis)
......@@ -154,3 +162,77 @@ def readArbdCoords(fname):
for line in fh:
coords.append([float(x) for x in line.split()[1:]])
return np.array(coords)
def readAvgArbdCoords(psf,pdb,dcd,rmsdThreshold=3.5):
import mdtraj as md
## align trajectory to pdb
ref = md.load(pdb, top=pdb)
sel = md.load(dcd, top=pdb)
# return ref.xyz[0,:,:]*10
# ref = sel[-1]
ids = ref.topology.select("name =~ 'D.*'")
assert(len(ids) > 3)
# r0 = ref.xyz[0,ids,:]
r0 = sel.xyz[-1,ids,:]
t = -1 # in case dcd_frames < 3
for t in range(len(sel.xyz)-2,-1,-1):
# print(t)
R,comA,comB = minimizeRmsd(sel.xyz[t,ids,:],r0)
sel.xyz[t,:,:] = np.array( [(r-comA).dot(R)+comB for r in sel.xyz[t]] )
rmsd = np.mean( (sel.xyz[t,ids,:]-r0)**2 )
if rmsd > (0.1*rmsdThreshold)**2:
break
t0=t+1
print( "Averaging coordinates in %s after frame %d" % (dcd, t0) )
pos = sel.xyz[t0:,:,:]
pos = np.mean(pos, axis=0)
return 10*pos # convert to Angstroms
# def readAvgArbdCoords(psf,pdb,dcd,rmsdThreshold=3.5):
# import MDAnalysis as md
# ## align trajectory to pdb
# from pdb import set_trace
# set_trace()
# uref = md.Universe(psf, pdb)
# usel = md.Universe(psf, dcd)
# ref = uref.select_atoms("name 'd*'")
# sel = usel.select_atoms("name 'd*'")
# # r0 = ref.xyz[0,ids,:]
# ts = usel.trajectory[-1]
# ts.frame
# r0 = sel.positions
# for t in range(ts.frame-1,-1,-1):
# print(t)
# usel.trajectory[t]
# R,comA,comB = minimizeRmsd(sel.positions,r0)
# sel.positions = np.array( [(r-comA).dot(R)+comB for r in sel.positions] )
# rmsd = np.mean( (sel.positions-r0)**2 )
# if rmsd > (0.1*rmsdThreshold)**2:
# break
# t0=t+1
# print( "Averaging coordinates in %s after frame %d" % (dcd, t0) )
# pos = sel.xyz[t0:,:,:]
# pos = np.mean(pos, axis=0)
# return 10*pos # convert to Angstroms
def unit_quat_conversions():
for axis in [[0,0,1],[1,1,1],[1,0,0],[-1,-2,0]]:
for angle in np.linspace(-180,180,10):
R = rotationAboutAxis(axis, angle)
R2 = quaternion_to_matrix( quaternion_from_matrix( R ) )
if not np.all( np.abs(R-R2) < 0.01 ):
import pdb
pdb.set_trace()
quaternion_to_matrix( quaternion_from_matrix( R ) )
if __name__ == "__main__":
unit_quat_conversions()
......@@ -9,11 +9,13 @@ from scipy.special import erf
import scipy.optimize as opt
from scipy import interpolate
import types
from CanonicalNucleotideAtoms import canonicalNtFwd, canonicalNtRev, seqComplement
from CanonicalNucleotideAtoms import enmTemplateHC, enmTemplateSQ, enmCorrectionsHC
import pdb
"""
TODO:
- fix handling of crossovers for atomic representation
+ fix handling of crossovers for atomic representation
- map to atomic representation
- remove performance bottlenecks
- test for large systems
......@@ -27,6 +29,7 @@ class Location():
## TODO: remove cyclic references(?)
self.container = container
self.address = address # represents position along contour length in segments
# assert( type_ in ("end3","end5") ) # TODO remove or make conditional
self.type_ = type_
self.particle = None
......@@ -84,6 +87,7 @@ class SegmentParticle(PointParticle):
return B.address-nt/seg.num_nts
else:
return B.address+nt/seg.num_nts
## ERROR
print("")
for c,A,B in self.parent.get_connections_and_locations():
print(" ",c.type_)
......@@ -176,11 +180,17 @@ class Segment(ConnectableElement, Group):
orientation = orientation.dot( self.start_orientation )
else:
q = interpolate.splev(s, self.quaternion_spline_params)
q = q/np.linalg.norm(q)
orientation = quaternion_to_matrix(q)
else:
orientation = None
return orientation
def get_contour_sorted_connections_and_locations(self):
sort_fn = lambda c: c[1].address
cl = self.get_connections_and_locations()
return sorted(cl, key=sort_fn)
def _get_num_beads(self, max_basepairs_per_bead, max_nucleotides_per_bead ):
raise NotImplementedError
......@@ -188,6 +198,100 @@ class Segment(ConnectableElement, Group):
def _generate_one_bead(self, contour_position, nts):
raise NotImplementedError
def _generate_atomic_nucleotide(self, contour_position, is_fwd, seq):
""" Seq should include modifications like 5T, T3 Tsinglet; direction matters too """
# print("Generating nucleotide at {}".format(contour_position))
pos = self.contour_to_position(contour_position)
if self.local_twist:
orientation = self.contour_to_orientation(contour_position)
## TODO: move this code (?)
if orientation is None:
axis = self.contour_to_tangent(contour_position)
angleVec = np.array([1,0,0])
if axis.dot(angleVec) > 0.9: angleVec = np.array([0,1,0])
angleVec = angleVec - angleVec.dot(axis)*axis
angleVec = angleVec/np.linalg.norm(angleVec)
y = np.cross(axis,angleVec)
orientation = np.array([angleVec,y,axis]).T
## TODO: improve placement of ssDNA
# rot = rotationAboutAxis( axis, contour_position*self.twist_per_nt*self.num_nts, normalizeAxis=True )
# orientation = rot.dot(orientation)
else:
orientation = orientation
else:
raise NotImplementedError
# key = self.sequence
# if self.ntAt5prime is None and self.ntAt3prime is not None: key = "5"+key
# if self.ntAt5prime is not None and self.ntAt3prime is None: key = key+"3"
# if self.ntAt5prime is None and self.ntAt3prime is None: key = key+"singlet"
key = seq
## TODO
if not is_fwd:
nt_dict = canonicalNtFwd
else:
nt_dict = canonicalNtRev
atoms = nt_dict[ key ].duplicate() # TODO: clone
# print( atoms.orientation, orientation )
# atoms.orientation = atoms.orientation.dot( orientation )
atoms.orientation = orientation.dot(atoms.orientation)
atoms.position = pos
## TODO: scale positions
return atoms
def get_5prime_locations(self):
""" Returns tuple of contour_position and direction of strand
True represents a strand whose 5-to-3 direction increases with contour
"""
raise NotImplementedError
def get_end_of_strand(self, contour_pos, is_fwd):
## connections to other segments
cl = self.get_contour_sorted_connections_and_locations()
if not is_fwd: # reverse sorted list if not forward
cl = cl[::-1]
## TODO include nicks
## Iterate through connection locations in segment
for c,l,B in cl:
if l.particle is None:
pos = l.address
else:
pos = l.particle.get_contour_position()
## skip locations that are not in the correct direction
if is_fwd:
if pos <= contour_pos: continue
elif pos >= contour_pos: continue
## Stop at every crossover
if c.type_ == "crossover":
if l.type_ == is_fwd:
print(" crossover -> cross")
return pos, B.container, B.address, B.type_
else:
print(" crossover -> continue")
return pos, l.container, pos, is_fwd # break here, but continue strand
if l.type_ == "end3":
print(" end3")
assert( np.abs(B.address) < 1e-2 or np.abs(B.address-1) < 1e-2 )
direction = B.address < 0.5
return pos, B.container, B.address, direction
if is_fwd:
return 1, None, None, None
else:
return 0, None, None, None
def get_nearest_bead(self, contour_position):
if len(self.beads) < 1: return None
cs = np.array([b.contour_position for b in self.beads]) # TODO: cache
......@@ -245,7 +349,8 @@ class Segment(ConnectableElement, Group):
## (currently unused)
## First find points between-which beads must be generated
locs = sorted([loc for c,loc,other_loc in self.get_connections_and_locations()], key=lambda l:l.address)
conn_locs = self.get_contour_sorted_connections_and_locations()
locs = [A for c,A,B in conn_locs]
existing_beads = [l.particle for l in locs if l.particle is not None]
for b in existing_beads:
assert(b.parent is not None)
......@@ -301,9 +406,6 @@ class Segment(ConnectableElement, Group):
def _regenerate_beads(self, max_nts_per_bead=4, ):
...
def _generate_atomic(self, atomic_model):
...
class DoubleStrandedSegment(Segment):
......@@ -377,12 +479,10 @@ class DoubleStrandedSegment(Segment):
## Create locations, connections and add to segments
c = nt/self.num_nts
print(self.name,nt,c)
assert(c >= 0 and c <= 1)
loc = Location( self, address=c, type_ = strands_fwd[0] )
c = other_nt/other.num_nts
print(other.name,other_nt,c)
assert(c >= 0 and c <= 1)
other_loc = Location( other, address=c, type_ = strands_fwd[1] )
self._connect(other, Connection( loc, other_loc, type_="crossover" ))
......@@ -401,6 +501,20 @@ class DoubleStrandedSegment(Segment):
def _get_num_beads(self, contour, max_basepairs_per_bead, max_nucleotides_per_bead):
return int(contour*self.num_nts // max_basepairs_per_bead)
def get_5prime_locations(self):
locs = []
## Add ends of segment if they aren't connected
cl = self.get_connections_and_locations()
connlocs = [A for c,A,B in cl]
if self.start5 not in connlocs:
locs.append((0,True)) # TODO
if self.end5 not in connlocs:
locs.append((1,False)) # TODO
## TODO Add nicks
return locs
def _generate_one_bead(self, contour_position, nts):
pos = self.contour_to_position(contour_position)
if self.local_twist:
......@@ -419,31 +533,7 @@ class DoubleStrandedSegment(Segment):
contour_position=contour_position )
self._add_bead(bead)
return bead
def _generate_atomic(self, atomic_model):
...
# def add_crossover(self, locationInA, B, locationInB):
# j = Crossover( [self, B], [locationInA, locationInB] )
# self._join(B,j)
# def add_internal_crossover(self, locationInA, B, locationInB):
# j = Crossover( [self, B], [locationInA, locationInB] )
# self._join(B,j)
# def stack_end(self, myEnd):
# ## Perhaps this should not really be possible; these ends should be part of same helix
# ...
# def connect_strand(self, other):
# ...
# def break_apart(self):
# """Break into smaller pieces so that "crossovers" are only at the ends"""