From c9b692fadf969a85ce5a1e9a2020770f04acb169 Mon Sep 17 00:00:00 2001 From: Chris Maffeo <cmaffeo2@illinois.edu> Date: Mon, 20 Aug 2018 08:17:22 -0500 Subject: [PATCH] Added support for ENRG MD restraints; improved organization of "strand_pieces" --- cadnano_segments.py | 1 - segmentmodel.py | 412 ++++++++++++++++++++++---------------------- 2 files changed, 208 insertions(+), 205 deletions(-) diff --git a/cadnano_segments.py b/cadnano_segments.py index b8149c2..4bb7b06 100644 --- a/cadnano_segments.py +++ b/cadnano_segments.py @@ -543,7 +543,6 @@ def read_model(json_data, sequence=None): max_basepairs_per_bead = 5, max_nucleotides_per_bead = 5, local_twist=False) - model._generate_strands() # TODO: move into model creation # TODO # try: diff --git a/segmentmodel.py b/segmentmodel.py index c45801f..bd12bac 100644 --- a/segmentmodel.py +++ b/segmentmodel.py @@ -357,6 +357,10 @@ class Segment(ConnectableElement, Group): self._bead_model_generation = 0 # TODO: remove? self.segment_model = segment_model # TODO: remove? + self.strand_pieces = dict() + for d in ('fwd','rev'): + self.strand_pieces[d] = [] + self.num_nt = int(num_nt) if end_position is None: end_position = np.array((0,0,self.distance_per_nt*num_nt)) + start_position @@ -409,7 +413,6 @@ class Segment(ConnectableElement, Group): def contour_to_orientation(self,s): assert( isinstance(s,float) or isinstance(s,int) or len(s) == 1 ) # TODO make vectorized version - orientation = None if self.quaternion_spline_params is None: axis = self.contour_to_tangent(s) axis = axis / np.linalg.norm(axis) @@ -461,31 +464,26 @@ class Segment(ConnectableElement, Group): # print("Generating nucleotide at {}".format(contour_position)) pos = self.contour_to_position(contour_position) - if self.local_twist: - orientation = self.contour_to_orientation(contour_position) - ## TODO: move this code (?) - if orientation is None: - axis = self.contour_to_tangent(contour_position) - angleVec = np.array([1,0,0]) - if axis.dot(angleVec) > 0.9: angleVec = np.array([0,1,0]) - angleVec = angleVec - angleVec.dot(axis)*axis - angleVec = angleVec/np.linalg.norm(angleVec) - y = np.cross(axis,angleVec) - orientation = np.array([angleVec,y,axis]).T - ## TODO: improve placement of ssDNA - # rot = rotationAboutAxis( axis, contour_position*self.twist_per_nt*self.num_nt, normalizeAxis=True ) - # orientation = rot.dot(orientation) - else: - orientation = orientation - - else: - raise NotImplementedError - - # key = self.sequence - # if self.ntAt5prime is None and self.ntAt3prime is not None: key = "5"+key - # if self.ntAt5prime is not None and self.ntAt3prime is None: key = key+"3" - # if self.ntAt5prime is None and self.ntAt3prime is None: key = key+"singlet" + orientation = self.contour_to_orientation(contour_position) + """ deleteme + ## TODO: move this code (?) + if orientation is None: + import pdb + pdb.set_trace() + axis = self.contour_to_tangent(contour_position) + angleVec = np.array([1,0,0]) + if axis.dot(angleVec) > 0.9: angleVec = np.array([0,1,0]) + angleVec = angleVec - angleVec.dot(axis)*axis + angleVec = angleVec/np.linalg.norm(angleVec) + y = np.cross(axis,angleVec) + orientation = np.array([angleVec,y,axis]).T + ## TODO: improve placement of ssDNA + # rot = rotationAboutAxis( axis, contour_position*self.twist_per_nt*self.num_nt, normalizeAxis=True ) + # orientation = rot.dot(orientation) + else: + orientation = orientation + """ key = seq nt_dict = canonicalNtFwd if is_fwd else canonicalNtRev @@ -547,6 +545,24 @@ class Segment(ConnectableElement, Group): for c in cl: yield c + ## TODO rename + def _add_strand_piece(self, strand_piece): + """ Registers a strand segment within this object """ + + ## TODO use weakref + d = 'fwd' if strand_piece.is_fwd else 'rev' + + ## Validate strand_piece (ensure no clashes) + for s in self.strand_pieces[d]: + l,h = sorted((s.start,s.end)) + for value in (strand_piece.start,strand_piece.end): + assert( value < l or value > h ) + + ## Add strand_piece in correct order + self.strand_pieces[d].append(strand_piece) + self.strand_pieces[d] = sorted(self.strand_pieces[d], + key = lambda x: x.start) + ## TODO rename def get_strand_segment(self, nt_pos, is_fwd, move_at_least=0.5): """ Walks through locations, checking for crossovers """ @@ -623,6 +639,15 @@ class Segment(ConnectableElement, Group): return self.beads[i] + def _get_atomic_nucleotide(self, nucleotide_idx, is_fwd=True): + d = 'fwd' if is_fwd else 'rev' + for s in self.strand_pieces[d]: + try: + return s.get_nucleotide(nucleotide_idx) + except: + pass + raise Exception("Could not find nucleotide in {} at {}.{}".format( self, nucleotide_idx, d )) + def get_all_consecutive_beads(self, number): assert(number >= 1) ## Assume that consecutive beads in self.beads are bonded @@ -1030,7 +1055,6 @@ class SingleStrandedSegment(Segment): contour_position=contour_position ) self._add_bead(b) return b - class StrandInSegment(Group): """ Represents a piece of an ssDNA strand within a segment """ @@ -1048,6 +1072,7 @@ class StrandInSegment(Group): nts = np.abs(end-start)+1 self.num_nt = int(round(nts)) assert( np.isclose(self.num_nt,nts) ) + segment._add_strand_piece(self) def _nucleotide_ids(self): nt0 = self.start # seg.contour_to_nt_pos(self.start) @@ -1070,6 +1095,19 @@ class StrandInSegment(Group): def get_contour_points(self): c0,c1 = [self.segment.nt_pos_to_contour(p) for p in (self.start,self.end)] return np.linspace(c0,c1,self.num_nt) + + def get_nucleotide(self, idx): + """ idx expressed as nt coordinate within segment """ + + lo,hi = sorted((self.start,self.end)) + if self.is_fwd: + idx_in_strand = idx - lo + else: + idx_in_strand = hi - idx + assert( np.isclose( idx_in_strand , int(round(idx_in_strand)) ) ) + assert(idx_in_strand >= 0) + return self.children[int(round(idx_in_strand))] + class Strand(Group): """ Represents an entire ssDNA strand from 5' to 3' as it routes through segments """ @@ -1132,7 +1170,7 @@ class Strand(Group): # assert( len(ret) == self.num_nt ) # return ret - def generate_atomic_model(self,scale): + def generate_atomic_model(self, scale, first_atomic_index): last = None resid = 1 ## TODO relabel "strand_segment" @@ -1174,6 +1212,9 @@ class Strand(Group): nt.__dict__['resid'] = resid resid += 1 last = nt + nt._first_atomic_index = first_atomic_index + first_atomic_index += len(nt.children) + return first_atomic_index def update_atomic_orientations(self,default_orientation): last = None @@ -1220,7 +1261,8 @@ class SegmentModel(ArbdModel): self._generate_bead_model( max_basepairs_per_bead, max_nucleotides_per_bead, local_twist, escapable_twist) self.useNonbondedScheme( nbDnaScheme ) - + self.useTclForces = False + self._generate_strands() def get_connections(self,type_=None,exclude=()): """ Find all connections in model, without double-counting """ @@ -2146,6 +2188,21 @@ class SegmentModel(ArbdModel): s.segname = "D%03d" % counter counter += 1 + def _assign_basepairs(self): + ## Assign basepairs + for seg in self.segments: + if isinstance(seg, DoubleStrandedSegment): + strands1 = seg.strand_pieces['fwd'] # already sorted + strands2 = seg.strand_pieces['rev'] + + nts1 = [nt for s in strands1 for nt in s.children] + nts2 = [nt for s in strands2 for nt in s.children[::-1]] + assert(len(nts1) == len(nts2)) + for nt1,nt2 in zip(nts1,nts2): + ## TODO weakref + nt1.basepair = nt2 + nt2.basepair = nt1 + def _update_orientations(self,orientation): for s in self.strands: s.update_atomic_orientations(orientation) @@ -2162,202 +2219,149 @@ class SegmentModel(ArbdModel): else: raise Exception("Lattice type '%s' not supported" % self.latticeType) - + ## TODO: allow ENM to be created without first building atomic model noStackPrime = 0 noBasepair = 0 with open("%s.exb" % prefix,'w') as fh: # natoms=0 - for seg in self.segments: - for nt1 in seg: - other = [] - nt2 = nt1.basepair - if nt2 is not None: - if nt2.firstAtomicIndex > nt1.firstAtomicIndex: + for seg in self.segments: + ## Continue unless dsDNA + if not isinstance(seg,DoubleStrandedSegment): continue + for strand_piece in seg.strand_pieces['fwd'] + seg.strand_pieces['rev']: + for nt1 in strand_piece.children: + other = [] + nt2 = nt1.basepair + if strand_piece.is_fwd: other.append((nt2,'pair')) - nt2 = nt2.stack3prime - if nt2 is not None and nt2.firstAtomicIndex > nt1.firstAtomicIndex: + + nt2 = nt2.get_intrahelical_above() + if nt2 is not None and strand_piece.is_fwd: + ## TODO: check if this already exists other.append((nt2,'paircross')) - else: - noBasepair += 1 - - nt2 = nt1.stack3prime - if nt2 is not None: - other.append((nt2,'stack')) - nt2 = nt2.basepair - if nt2 is not None and nt2.firstAtomicIndex > nt1.firstAtomicIndex: - other.append((nt2,'cross')) - else: - noStackPrime += 1 - - a1 = nt1.atoms(transform=False) - for nt2,key in other: - key = ','.join((key,nt1.sequence[0],nt2.sequence[0])) - for n1, n2, d in enmTemplate[key]: - d = float(d) - a2 = nt2.atoms(transform=False) - i,j = [a.index(n)-1 for a,n in zip((a1,a2),(n1,n2))] - # try: - # i,j = [a.index(n)-1 for a,n in zip((a1,a2),(n1,n2))] - # except: - # continue - - k = 0.1 - if self.latticeType == 'honeycomb': - correctionKey = ','.join((key,n1,n2)) - assert(correctionKey in enmCorrectionsHC) + nt2 = nt1.get_intrahelical_above() + if nt2 is not None: + other.append((nt2,'stack')) + nt2 = nt2.basepair + if nt2 is not None: + other.append((nt2,'cross')) + + for nt2,key in other: + """ + if np.linalg.norm(nt2.position-nt1.position) > 7: + import pdb + pdb.set_trace() + """ + key = ','.join((key,nt1.sequence[0],nt2.sequence[0])) + for n1, n2, d in enmTemplate[key]: + d = float(d) + k = 0.1 + if lattice_type == 'honeycomb': + correctionKey = ','.join((key,n1,n2)) + assert(correctionKey in enmCorrectionsHC) dk,dr = enmCorrectionsHC[correctionKey] k = float(dk) d += float(dr) - i += nt1.firstAtomicIndex - j += nt2.firstAtomicIndex - fh.write("bond %d %d %f %.2f\n" % (i,j,k,d)) - - - print("NO STACKS found for:", noStackPrime) - print("NO BASEPAIRS found for:", noBasepair) - - props, origins = self.part.helixPropertiesAndOrigins() - origins = [np.array(o) for o in origins] - - ## Push bonds - pushBonds = [] - - fwdDict = dict() # dictionaries of nts with keys [hid][zidx] - revDict = dict() - xo = dict() # dictionary of crossovers between [hid1][hid2] - - helixIds = [hid for s in self.segments for hid in s.nodes.keys()] - helixIds = sorted(list(set(helixIds))) - - #- initialize dictionaries - for h1 in helixIds: - fwdDict[h1] = dict() - revDict[h1] = dict() - xo[h1] = dict() - for h2 in helixIds: - xo[h1][h2] = [] - - #- fill dictionaries - for seg in self.segments: - for nt in seg: - hid = nt.prop['helixId'] - - ## Add nt to ntDict - idx = nt.prop['idx'] - zidx = nt.prop['zIdx'] - isFwd = nt.prop['isFwd'] - if isFwd: - fwdDict[hid][idx] = nt - else: - revDict[hid][idx] = nt - - ## Find crossovers and ends - for nt2 in (nt.ntAt5prime, nt.ntAt3prime): - if nt2 is not None: - hid2 = nt2.prop['helixId'] - if hid2 != hid: - # xo[hid][hid2].append(zidx) - xo[hid][hid2].append(idx) - xo[hid2][hid].append(idx) - - props = self.part.getModelProperties() - if props.get('point_type') == PointType.ARBITRARY: - raise Exception("Not implemented") - else: - props, origins = self.part.helixPropertiesAndOrigins() - neighborHelices = atomicModel._getNeighborHelixDict(self.part) - - if self.latticeType == 'honeycomb': - # minDist = 8 # TODO: test against server - minDist = 11 # Matches ENRG MD server... should it? - elif self.latticeType == 'square': - minDist = 11 - - for hid1 in helixIds: - for hid2 in neighborHelices[hid1]: - if hid2 not in helixIds: - # print("WARNING: writeENM: helix %d not in helixIds" % hid2) # xo[%d] dict" % (hid2,hid1)) + i = nt1._get_atomic_index(name=n1) + j = nt2._get_atomic_index(name=n2) + fh.write("bond %d %d %f %.2f\n" % (i,j,k,d)) + + # print("NO STACKS found for:", noStackPrime) + # print("NO BASEPAIRS found for:", noBasepair) + + ## Loop dsDNA regions + push_bonds = [] + processed_segs = set() + ## TODO possibly merge some of this code with SegmentModel.get_consecutive_crossovers() + for segI in self.segments: # TODOTODO: generalize through some abstract intrahelical interface that effectively joins "segments", for now interhelical bonds that cross intrahelically-connected segments are ignored + if not isinstance(segI,DoubleStrandedSegment): continue + + ## Loop over dsDNA regions connected by crossovers + conn_locs = segI.get_contour_sorted_connections_and_locations("crossover") + other_segs = list(set([B.container for c,A,B in conn_locs])) + for segJ in other_segs: + if (segI,segJ) in processed_segs: + continue + processed_segs.add((segI,segJ)) + processed_segs.add((segJ,segI)) + + ## TODO perhaps handle ends that are not between crossovers + + ## Loop over ordered pairs of crossovers between the two + cls = filter(lambda x: x[-1].container == segJ, conn_locs) + cls = sorted( cls, key=lambda x: x[1].get_nt_pos() ) + for cl1,cl2 in zip(cls[:-1],cls[1:]): + c1,A1,B1 = cl1 + c2,A2,B2 = cl2 + + ntsI1,ntsI2 = [segI.contour_to_nt_pos(A.address) for A in (A1,A2)] + ntsJ1,ntsJ2 = [segJ.contour_to_nt_pos(B.address) for B in (B1,B2)] + ntsI = ntsI2-ntsI1+1 + ntsJ = ntsJ2-ntsJ1+1 + assert( np.isclose( ntsI, int(round(ntsI)) ) ) + assert( np.isclose( ntsJ, int(round(ntsJ)) ) ) + ntsI,ntsJ = [int(round(i)) for i in (ntsI,ntsJ)] + + ## Find if dsDNA "segments" are pointing in same direction + ## could move this block out of the loop + tangentA = segI.contour_to_tangent(A1.address) + tangentB = segJ.contour_to_tangent(B1.address) + dot1 = tangentA.dot(tangentB) + tangentA = segI.contour_to_tangent(A2.address) + tangentB = segJ.contour_to_tangent(B2.address) + dot2 = tangentA.dot(tangentB) + + if dot1 > 0.5 and dot2 > 0.5: + ... + elif dot1 < -0.5 and dot2 < -0.5: + ## TODO, reverse + ... + print("Warning: {} and {} are on antiparallel helices (not yet implemented)... skipping".format(A,B)) + continue + else: + print("Warning: {} and {} are on helices that do not point in similar direction... skipping".format(A,B)) continue - ## Build chunk for helix pair - chunks = self._getDsChunksForHelixPair(hid1,hid2) - - for lo,hi in chunks: - # pdb.set_trace() - nts1 = self.getNtsBetweenCadnanoIdxInclusive(hid1,lo,hi) - nts2 = self.getNtsBetweenCadnanoIdxInclusive(hid2,lo,hi) - - if len(nts1) <= len(nts2): - iVals = list(range(len(nts1))) - if len(nts1) > 1: - jVals = [int(round(j*(len(nts2)-1)/(len(nts1)-1))) for j in range(len(nts1))] - else: - jVals = [0] - else: - if len(nts2) > 1: - iVals = [int(round(i*(len(nts1)-1)/(len(nts2)-1))) for i in range(len(nts2))] - else: - iVals = [0] - jVals = list(range(len(nts2))) - - ntPairs = [[nts1[i],nts2[j]] for i,j in zip(iVals,jVals)] - - ## Skip pairs close to nearest crossover on lo side - xoIdx = [idx for idx in xo[hid1][hid2] if idx <= lo] - if len(xoIdx) > 0: - xoIdx = max(xoIdx) - assert( type(lo) is int ) - xoDist = min([len(self.getNtsBetweenCadnanoIdxInclusive(hid,xoIdx,lo-1)) for hid in (hid1,hid2)]) - - skip=minDist-xoDist - if skip > 0: - ntPairs = ntPairs[skip:] - - ## Skip pairs close to nearest crossover on hi side - xoIdx = [idx for idx in xo[hid1][hid2] if idx >= hi] - if len(xoIdx) > 0: - xoIdx = min(xoIdx) - xoDist = min([len(self.getNtsBetweenCadnanoIdxInclusive(hid,hi+1,xoIdx)) for hid in (hid1,hid2)]) - - skip=minDist-xoDist - if skip > 0: - ntPairs = ntPairs[:-skip] - - - for ntPair in ntPairs: - bps = [nt.basepair for nt in ntPair] - if None in bps: continue - for nt1,nt2 in [ntPair,bps]: - i,j = [nt.firstAtomicIndex for nt in (nt1,nt2)] - if j <= i: continue - - if np.linalg.norm(nt1.position-nt2.position) > 45: continue - - ai,aj = [nt.atoms(transform=False) for nt in (nt1,nt2)] - try: - i += ai.index('P')-1 - j += aj.index('P')-1 - pushBonds.append(i) - pushBonds.append(j) - except: - pass - - print("PUSH BONDS:", len(pushBonds)/2) + ## Go through each nucleotide between the two + for ijmin in range(min(ntsI,ntsJ)): + i=j=ijmin + if ntsI < ntsJ: + j = int(round(float(ntsJ*i)/ntsI)) + elif ntsJ < ntsI: + i = int(round(float(ntsI*j)/ntsJ)) + ntI_idx = int(round(ntsI1+i)) + ntJ_idx = int(round(ntsJ1+j)) + + ## Skip nucleotides that are too close to crossovers + if i < 11 or j < 11: continue + if ntsI2-ntI_idx < 11 or ntsJ2-ntJ_idx < 11: continue + + ## Find phosphates at ntI/ntJ + for direction in [True,False]: + try: + i = segI._get_atomic_nucleotide(ntI_idx, direction)._get_atomic_index(name="P") + j = segJ._get_atomic_nucleotide(ntJ_idx, direction)._get_atomic_index(name="P") + push_bonds.append((i,j)) + except: + # print("WARNING: could not find 'P' atom in {}:{} or {}:{}".format( segI, ntI_idx, segJ, ntJ_idx )) + ... + + print("PUSH BONDS:", len(push_bonds)) if not self.useTclForces: with open("%s.exb" % prefix, 'a') as fh: - for i,j in zip(pushBonds[::2],pushBonds[1::2]): + for i,j in push_bonds: fh.write("bond %d %d %f %.2f\n" % (i,j,1.0,31)) else: - atomList = list(set(pushBonds)) + flat_push_bonds = list(sum(push_bonds)) + atomList = list(set( flat_push_bonds )) with open("%s.forces.tcl" % prefix,'w') as fh: fh.write("set atomList {%s}\n\n" % " ".join([str(x-1) for x in atomList]) ) fh.write("set bonds {%s}\n" % - " ".join([str(x-1) for x in pushBonds]) ) + " ".join([str(x-1) for x in flat_push_bonds]) ) fh.write(""" foreach atom $atomList { addatom $atom @@ -2388,12 +2392,12 @@ proc calcforces {} { } """) - - def _generate_atomic_model(self, scale=1): self.children = self.strands + first_atomic_index = 0 for s in self.strands: - s.generate_atomic_model(scale) + first_atomic_index = s.generate_atomic_model(scale,first_atomic_index) + self._assign_basepairs() return ## Angle optimization -- GitLab