Python
- #=========================================================================================
 - # Licence, Reference and Credits
 - #=========================================================================================
 - __copyright__ = "Copyright (C) CCPN project (http://www.ccpn.ac.uk) 2014 - 2021"
 - __credits__ = ("Ed Brooksbank, Luca Mureddu, Timothy J Ragan & Geerten W Vuister")
 - __licence__ = ("CCPN licence. See http://www.ccpn.ac.uk/v3-software/downloads/license")
 - __reference__ = ("Skinner, S.P., Fogh, R.H., Boucher, W., Ragan, T.J., Mureddu, L.G., & Vuister, G.W.",
 -                  "CcpNmr AnalysisAssign: a flexible platform for integrated NMR analysis",
 -                  "J.Biomol.Nmr (2016), 66, 111-124, http://doi.org/10.1007/s10858-016-0060-y")
 - #=========================================================================================
 - # Last code modification
 - #=========================================================================================
 - __modifiedBy__ = "$modifiedBy: Luca Mureddu $"
 - __dateModified__ = "$dateModified: 2021-02-08 19:23:47 +0000 (Mon, February 08, 2021) $"
 - __version__ = "$Revision: 3.0.3 $"
 - #=========================================================================================
 - # Created
 - #=========================================================================================
 - __author__ = "$Author: Luca Mureddu $"
 - __date__ = "$Date: 2021-02-08 10:28:42 +0000 (Monday, February 08, 2021) $"
 - #=========================================================================================
 - # Start of code
 - #=========================================================================================
 -  
 - """
 - This macro writes an NmrPipe-like file using some of the CcpNmr peak properties.
 -  
 - RUN: 
 -     - From Macro Editor:
 -         * Main Menu -> Macro -> New
 -         * Copy this macro inside the Macro Editor module
 -         * Define a spectrumName and the outputPath varialbles, (and any other available as required).
 -         * Click run (Green `play` button) 
 -  
 -     - From Python Console
 -          * Copy this macro in a new text file and add the extension `.py`
 -          * Main Menu -> View -> Python Console (or space-space shortcut) and type:
 -          * %run -i thisMacro.py -o outputPath -s spectrumName
 -             outputPath:  the full directory path where you want save the new file
 -             spectrumName: the spectrum of interest
 -  
 -  
 - Post your changes/improvements on the Ccpn Forum!
 -  
 - If guessClusterId is set to True, it will add a cluster id for each peak.
 - The algorithm used is based on a "connected component analysis" as seen in https://stackoverflow.com/questions/14607317/
 - and might not produce the expected results! There are many other (more efficient) ways of doing this operation
 - but is not the purpose of this macro! Set guessClusterId to False  if not happy with the result.
 - In Peak there isn`t a property for clusterID (although we use Multiplets) so for this macro
 - is hack-ish stored as peak.annotation.
 -  
 - Gui Tips:
 -     - To cycle the peakLabels on display use the shortcut "p+l" until you find the one you need.
 -     - To cycle the peakSymbols on display use the shortcut "p+s" until you find the one you need.
 -     - Change a peak annotation from a peakTable, double click on its cell value.
 -  
 - Warnings:
 -      - Always backup the project before running macros!
 -      - The macro will set peak annotations. This will help visualising the clusters ids. If overwrites your annotations,
 -        undo to revert to the previous state.
 -      - Before running the macro close all GuiTables to avoid waiting for Gui updates!
 -      - Tested only for 2D spectra and hard-coded on first two dimensions. Will not work on 1D as it is now.
 -  
 - """
 -  
 -  
 - ##############################     User`s settings      #################################
 -  
 - spectrumName = `test`           # Name for the spectrum of interest. (Required)
 - spectrumGroupName = `group`     # Name for the spectrumGroup containing the spectra of interest (Optional). If set, the spectrumName is skipped.
 - peakListIndex = -1              # PeakList index to use. Default last added, index: -1. (Optional)
 -  
 - export = True                   # Set to False if don`t want to export. Default True. (Optional)
 - outputPath = `~/Desktop`        # Path dir where to export the files. FileName from the spectrum Name. (Required)
 -  
 -  
 - # clustering options
 - guessClusterId = True            # Guess a clusterId for all peaks. False to use just a serial. Default True. (Optional)
 - tolDim1 = 8                      # tolerance for the first dimension (in points). Used to find adjacent peaks. (Optional)
 - tolDim2 = 8                      # tolerance for the second dimension (in points). (Optional)
 - clusterByPositions  = True       # cluster peaks if their positions are within tolDim1 and tolDim2. Default True. (Optional)
 - clusterByLWs = False             # cluster peaks with overlapping lineWidths. If True, clusterByPositions will be False. (Optional)
 - increaseLWByNpercent = 50        # increase (on-the-fly) the lw by n% value for finding overlaps. No data is modified. (Optional)
 -  
 -  
 - ##############################    Start of the code      #################################
 -  
 - import numpy as np
 - import itertools
 - import pandas as pd
 - import argparse
 - from collections import Counter, defaultdict
 - from ccpn.util.Path import aPath, checkFilePath
 - from ccpn.core.lib.ContextManagers import undoBlockWithoutSideBar
 - from ccpn.util.Common import percentage
 - from ccpn.util.Common import makeIterableList as mi
 -  
 -  
 - def getArgs():
 -     parser = argparse.ArgumentParser( description=`Create an NmrPipe Tab file`)
 -     parser.add_argument(`-o`,  `--outputPath`, help=`Output directory path`, default=outputPath)
 -     parser.add_argument(`-s`,  `--spectrumName`, help=`Spectrum name`, default=spectrumName)
 -     parser.add_argument(`-g`,  `--spectrumGroupName`, help=`Spectrum group name`, default=spectrumGroupName)
 -     parser.add_argument(`-c`,  `--guessClusterId`, help=`Guess cluster Ids`, default=guessClusterId)
 -     parser.add_argument(`-t1`, `--tolDim1`, help=`Tolerance 1st dimension in points`, default=tolDim1)
 -     parser.add_argument(`-t2`, `--tolDim2`, help=`Tolerance 2nd dimension in points`, default=tolDim2)
 -     return parser
 -  
 - ##############################     Units convertion      #################################
 -  
 - def _hzLW2pnt(lwHz, sw, npoints):
 -     """
 -     :param lwHz: float. A peak lineWidth (in Hz)
 -     :param sw: float. Spectral width (in Hz)
 -     :param npoints: int. Total number of points
 -     :return: float. A peak lineWidth in points (per dimension)
 -     """
 -     if not all([lwHz, sw, npoints]): return
 -     hzXpnt = sw/npoints
 -     lwPnt = lwHz/hzXpnt
 -     return lwPnt
 -  
 - def getLineWidthsPnt(peak):
 -     sp = peak.peakList.spectrum
 -     return tuple(_hzLW2pnt(lwHz,sp.spectralWidthsHz[i],sp.totalPointCounts[i]) for i,lwHz in enumerate(peak.lineWidths))
 -  
 - def getPeakPositionHz(peak):
 -     return tuple([peak.position[i]*peak.peakList.spectrum.spectrometerFrequencies[i] for i,v in enumerate(peak.position)])
 -  
 - ############################## Guess peak clusters    #################################
 -  
 - def clusterOverlaps(nodes, adjacents):
 -     "Ref: https://stackoverflow.com/questions/14607317/"
 -     clusters = []
 -     nodes = list(nodes)
 -     while len(nodes):
 -         node = nodes[0]
 -         path = dfs(node, adjacents, nodes)
 -         clusters.append(path)
 -         for pt in path:
 -             nodes.remove(pt)
 -     return clusters
 -  
 - def dfs(start, adjacents, nodes):
 -     "Ref: https://stackoverflow.com/questions/14607317/"
 -     path = []
 -     q = [start]
 -     while q:
 -         node = q.pop(0)
 -         if path.count(node) >= nodes.count(node):
 -             continue
 -         path = path + [node]
 -         nextNodes = [p2 for p1,p2 in adjacents if p1 == node]
 -         q = nextNodes + q
 -     return path
 -  
 - def _getOverlapsByLineWidths(peaks, tolDim1=0.01, tolDim2=0.01, increaseByNpercent=0):
 -     """
 -     Consider two peaks adjacent if the LW are overlapping in both dimensions.
 -     :param peaks: list of ccpn peak objs
 -     :param increaseByNpercent: increase (on-the-fly) the lw by n% value for finding overlaps. No data is modified.
 -     :return: A list of adjacent pairs of peaks
 -     """
 -     overlaps = []
 -     if len(peaks) == 0: return []
 -     sp = peaks[0].peakList.spectrum
 -     for pair in itertools.combinations(peaks, 2):
 -         peakA, peakB = pair
 -         if not all(list(peakA.lineWidths) + list(peakB.lineWidths)):
 -             warning(`LineWidths is set to None for some peaks. Peaks skipped: %s` % ``.join(map(str, pair)))
 -             continue
 -         pos1A, pos2A = [peakA.pointPosition[i] for i in range(sp.dimensionCount)]
 -         lw1A, lw2A = [lw + percentage(increaseByNpercent, lw) for lw in getLineWidthsPnt(peakA)]
 -         pos1B, pos2B = [peakB.pointPosition[i] for i in range(sp.dimensionCount)]
 -         lw1B, lw2B = [lw + percentage(increaseByNpercent, lw) for lw in getLineWidthsPnt(peakB)]
 -         dim1a, dim2a = np.linspace(pos1A-(lw1A/2), pos1A+(lw1A/2),500), np.linspace(pos2A-(lw2A/2),pos2A+(lw2A/2),500)
 -         dim1b, dim2b = np.linspace(pos1B-(lw1B/2), pos1B+(lw1B/2),500), np.linspace(pos2B-(lw2B/2),pos2B+(lw2B/2),500)
 -         inters1 = dim1b[(np.abs(dim1a[:,None] - dim1b) < tolDim1).any(0)]
 -         inters2 = dim2b[(np.abs(dim2a[:,None] - dim2b) < tolDim2).any(0)]
 -         if all([len(inters1)>0, len(inters2)>0]):
 -             overlaps.append(pair)
 -     return overlaps
 -  
 - def _getOverlapPairsByPositions(peaks, tolDim1=10., tolDim2=10.):
 -     """
 -     Consider two peaks adjacent if the PointPositions are within the tolerances.
 -     :param peaks:
 -     :param tolDim1: tolerance for the first dimension (in points)
 -     :param tolDim2: tolerance for the second dimension (in points)
 -     :return: A list of adjacent pairs of peaks
 -     """
 -     overlaps = []
 -     for pair in itertools.combinations(peaks, 2):
 -         dim1a, dim2a = np.array(pair[0].pointPosition[0]), np.array(pair[0].pointPosition[1])
 -         dim1b, dim2b = np.array(pair[1].pointPosition[0]), np.array(pair[1].pointPosition[1])
 -         if (np.abs(dim1a - dim1b) < tolDim1) and (np.abs(dim2a - dim2b) < tolDim2):
 -             overlaps.append(pair)
 -     return overlaps
 -  
 - def _getClustID(peak):
 -     v = None
 -     try:
 -         v = int(peak.annotation)
 -     except Exception as e:
 -         warning(`Error converting peak annotation in int format. %s` %e)
 -     return v
 -  
 - def getMEMCNT(ids):
 -     return Counter(ids)
 -  
 - def setClusterIDs(peaks, guessClustID=True, tolDim1=8., tolDim2=8.):
 -     if guessClustID:
 -         if clusterByPositions and not clusterByLWs:
 -             overlappedPeaks = _getOverlapPairsByPositions(peaks, tolDim1=tolDim1, tolDim2=tolDim2)
 -         else:
 -             overlappedPeaks = _getOverlapsByLineWidths(peaks, increaseByNpercent=increaseLWByNpercent)
 -         positions = [pk.pointPosition for pk in peaks]
 -         overlappedPositions = [(pair[0].pointPosition, pair[1].pointPosition) for pair in overlappedPeaks]
 -         result = clusterOverlaps(positions, overlappedPositions)
 -         with undoBlockWithoutSideBar():
 -             allClusters = []
 -             for i, group in enumerate(result):
 -                 peakCluster = []
 -                 for peak in peaks:
 -                     for j in group:
 -                         if j == peak.pointPosition:
 -                             peakCluster.append(peak)
 -                             peak.annotation = str(i+1)
 -                 allClusters.append(peakCluster)
 -     else:
 -         with undoBlockWithoutSideBar():
 -             for i, peak in enumerate(peaks):
 -                 peak.annotation = str(i)
 -  
 - ############################## Build DataFrame from peaks #################################
 -  
 - def _getAssignmentLabel(peak):
 -     """
 -     :param peak:
 -     :return: str. the assignemnt in the format: TypeCode_AtomNames e.g. `LYS1_HN`.
 -         If multiple assignments per peak: `LYS1_HN-ALA2_H1N1`
 -     """
 -     dd = defaultdict(list)
 -     labels = []
 -     for na in mi(peak.assignments):
 -         dd[na.nmrResidue].append(na.name)
 -     for nr, nas in dd.items():
 -         l1 = ``.join([nr.residueType, nr.sequenceCode, `_`, *nas])
 -         labels.append(l1)
 -     if len(labels)>0:
 -         return `-`.join(labels)
 -  
 - VARS = [`INDEX`, `X_AXIS`, `Y_AXIS`, `DX`,     `DY`,   `X_PPM`, `Y_PPM`, `X_HZ`,  `Y_HZ`,  `XW`,    `YW`,    `XW_HZ`, `YW_HZ`, `X1`,  `X3`,   `Y1`, `Y3`,  `HEIGHT`, `DHEIGHT`, `VOL`, `PCHI2`, `TYPE`, `ASS`, `CLUSTID`, `MEMCNT`]
 - FORMAT = [`%5d`, `%9.3f`,   `%9.3f`, `%6.3f`, `%6.3f`, `%8.3f`, `%8.3f`, `%9.3f`, `%9.3f`, `%7.3f`, `%7.3f`, `%8.3f`, `%8.3f`, `%4d`, `%4d`, `%4d`, `%4d`, `%+e`,    `%+e`,     `%+e`, `%.5f`,   `%d`,  `%s`,  `%4d`,      `%4d`]
 - VFdict = {k:v for k,v in zip(VARS, FORMAT)}
 -  
 - # NMRPIPE VARS https://spin.niddk.nih.gov/NMRPipe/doc2new/
 -  
 - INDEX = `INDEX`     # REQUIRED      - The unique peak ID number.
 - X_AXIS = `X_AXIS`   # REQUIRED      - Peak position: in points in 1st dimension, from left of spectrum limit
 - Y_AXIS = `Y_AXIS`   # REQUIRED      - Peak position: in points in 2nd dimension, from bottom of spectrum limit
 - DX = `DX`           # NOT REQUIRED  - Estimate of the error in peak position due to random noise, in points.
 - DY = `DY`           # NOT REQUIRED  - Estimate of the error in peak position due to random noise, in points.
 - X_PPM = `X_PPM`     # NOT REQUIRED  - Peak position: in ppm in 1st dimension
 - Y_PPM = `Y_PPM`     # NOT REQUIRED  - Peak position: in ppm in 2nd dimension
 - X_HZ = `X_HZ`       # NOT REQUIRED  - Peak position: in Hz in 1st dimension
 - Y_HZ = `Y_HZ`       # NOT REQUIRED  - Peak position: in Hz in 2nd dimension
 - XW = `XW`           # REQUIRED      - Peak width: in points in 1st dimension
 - YW = `YW`           # REQUIRED      - Peak width: in points in 2nd dimension
 - XW_HZ = `XW_HZ`     # REQUIRED      - Peak width: in points in 1st dimension
 - YW_HZ = `YW_HZ`     # REQUIRED      - Peak width: in points in 2nd dimension
 - X1 = `X1`           # NOT REQUIRED  - Left border of peak in 1st dim, in points
 - X3 = `X3`           # NOT REQUIRED  - Right border of peak in 1st dim, in points
 - Y1 = `Y1`           # NOT REQUIRED  - Left border of peak in 2nd dim, in points
 - Y3 = `Y3`           # NOT REQUIRED  - Right border of peak in 2nd, in points
 - HEIGHT = `HEIGHT`   # NOT REQUIRED  - Peak height
 - DHEIGHT = `DHEIGHT` # NOT REQUIRED  - Peak height error
 - VOL = `VOL`         # NOT REQUIRED  - Peak volume
 - PCHI2 = `PCHI2`     # NOT REQUIRED  - the Chi-square probability for the peak (i.e. probability due to the noise)
 - TYPE = `TYPE`       # NOT REQUIRED  - the peak classification; 1=Peak, 2=Random Noise, 3=Truncation artifact.
 - ASS = `ASS`         # REQUIRED      - Peak assignment
 - CLUSTID = `CLUSTID` # REQUIRED      - Peak cluster id. Peaks with the same CLUSTID value are the overlapped.
 - MEMCNT = `MEMCNT`   # REQUIRED      - the total number of peaks which are in a given peak`s cluster
 -                                       # (i.e. peaks which have the same CLUSTID value)
 -  
 - NONE = None
 - NULLVALUE =  -666
 - NULLSTRING = `*`
 - UNKNOWN = 1
 - PCHI2Default = 0.00000
 - TYPEDefault = 1
 -  
 - VarsDict = {
 -         INDEX   : lambda x: VFdict.get(INDEX) % x.serial,
 -         X_AXIS  : lambda x: VFdict.get(X_AXIS) % (x.pointPosition[0] if x.pointPosition[0] else NULLVALUE),
 -         Y_AXIS  : lambda x: VFdict.get(Y_AXIS) % (x.pointPosition[1] if x.pointPosition[1] else NULLVALUE),
 -         DX      : lambda x: VFdict.get(DX) % (x.positionError[0] if x.positionError[0] else NULLVALUE),
 -         DY      : lambda x: VFdict.get(DY) % (x.positionError[1] if x.positionError[1] else NULLVALUE),
 -         X_PPM   : lambda x: VFdict.get(X_PPM) % (x.position[0] if x.position[0] else NULLVALUE),
 -         Y_PPM   : lambda x: VFdict.get(Y_PPM) % (x.position[1] if x.position[1] else NULLVALUE),
 -         X_HZ    : lambda x: VFdict.get(X_HZ) % (getPeakPositionHz(x)[0] if getPeakPositionHz(x)[0] else NULLVALUE),
 -         Y_HZ    : lambda x: VFdict.get(Y_HZ) % (getPeakPositionHz(x)[0] if getPeakPositionHz(x)[0] else NULLVALUE),
 -         XW      : lambda x: VFdict.get(XW) % (getLineWidthsPnt(x)[0] if getLineWidthsPnt(x)[0] else NULLVALUE),
 -         YW      : lambda x: VFdict.get(YW) % (getLineWidthsPnt(x)[1] if getLineWidthsPnt(x)[1] else NULLVALUE),
 -         XW_HZ   : lambda x: VFdict.get(XW_HZ) % (x.lineWidths[0] if x.lineWidths[0] else NULLVALUE),
 -         YW_HZ   : lambda x: VFdict.get(YW_HZ) % (x.lineWidths[1] if x.lineWidths[0] else NULLVALUE),
 -         X1      : lambda x: VFdict.get(X1) % (x.pointPosition[0]-10 if x.pointPosition[0] else NULLVALUE),
 -         X3      : lambda x: VFdict.get(X3) % (x.pointPosition[0]+10 if x.pointPosition[0] else NULLVALUE),
 -         Y1      : lambda x: VFdict.get(Y1) % (x.pointPosition[1]-10 if x.pointPosition[1] else NULLVALUE),
 -         Y3      : lambda x: VFdict.get(Y3) % (x.pointPosition[1]+10 if x.pointPosition[1] else NULLVALUE),
 -         HEIGHT  : lambda x: VFdict.get(HEIGHT) % (x.height if x.height else NULLVALUE),
 -         DHEIGHT : lambda x: VFdict.get(DHEIGHT) % (x.heightError if x.heightError else NULLVALUE),
 -         VOL     : lambda x: VFdict.get(VOL) % (x.volume if x.volume else NULLVALUE),
 -         PCHI2   : lambda x: VFdict.get(PCHI2) % (PCHI2Default),
 -         TYPE    : lambda x: VFdict.get(TYPE) % (TYPEDefault),
 -         ASS     : lambda x: VFdict.get(ASS) % (_getAssignmentLabel(x) if _getAssignmentLabel(x) else NULLSTRING),
 -         CLUSTID : lambda x: VFdict.get(CLUSTID) % (_getClustID(x) if _getClustID(x) else NULLVALUE),
 -         MEMCNT  : lambda x: VFdict.get(MEMCNT) % (UNKNOWN), # this is filled afterwards, if clusters
 -         }
 -  
 - def buildDataFrame(peaks):
 -     df = pd.DataFrame()
 -     for k, v in VarsDict.items():
 -         if v: df[k] = list(map(lambda x: v(x), peaks))
 -     _memcntDict = getMEMCNT(df[CLUSTID]) #set the MEMCNT
 -     df[MEMCNT] = [_memcntDict.get(i) for i in df[CLUSTID].values]
 -     df.set_index(df[INDEX], inplace=True)
 -     df.sort_index(inplace=True)
 -     return df
 -  
 - def dfToTab(df):
 -     "Return NmrPipe (NIH) tab-file formatted string"
 -     string = ``
 -     string += `VARS   %s\n` % ` `.join(VARS)
 -     string += `FORMAT %s\n\n` % ` `.join(FORMAT)
 -     string += `NULLVALUE -666`
 -     string += `\nNULLSTRING *\n\n`
 -     string += df.to_string(header=False, index=False) # formats are already defined when building the dataframe.
 -     return string
 -  
 - def runMacro():
 -  
 -     args = getArgs().parse_args()
 -     globals().update(args.__dict__)
 -     info(`Running macro with %s`%args)
 -  
 -     spectrum = get(`SP:`+spectrumName)
 -     spectrumGroup = get(`SG:`+spectrumGroupName)
 -     spectra = []
 -  
 -     if not any([spectrum, spectrumGroup]):
 -         msg = `You need at least a spectrum! Set the name at the beginning of this macro`
 -         print(msg)
 -         raise RuntimeError(msg)
 -  
 -     if spectrumGroup:
 -         spectra = spectrumGroup.spectra
 -         info(`Running macro using spectrumGroup`)
 -     else:
 -         if spectrum:
 -             info(`Running macro using a single spectrum`)
 -             spectra = [spectrum]
 -  
 -     if len(spectra)==0:
 -         msg = `You need at least a spectrum! Set the name at the beginning of this macro`
 -         print(msg)
 -         raise RuntimeError(msg)
 -  
 -     for spectrum in spectra:
 -         if len(spectrum.peakLists) == 0:
 -             warning(`You need at least a peakList for %s` %spectrum)
 -             continue
 -         if len(spectrum.peakLists[peakListIndex].peaks) == 0:
 -             warning(`You need at least a peak. %s skipped` %spectrum)
 -             continue
 -  
 -         peaks = spectrum.peakLists[peakListIndex].peaks
 -         peaks.sort(key=lambda x: x.pointPosition[0], reverse=False)
 -         _guessClusterId = True if guessClusterId in [`True`, `y`, `Y`, `yes`, `true`] else False
 -         setClusterIDs(peaks, _guessClusterId, float(tolDim1), float(tolDim2))
 -         df = buildDataFrame(peaks)
 -         if export:
 -             isPathOk, msgErr = checkFilePath(aPath(outputPath))
 -             if not isPathOk:
 -                 print(`File not saved. Error: %s.`%msgErr)
 -             else:
 -                 fileName = aPath(outputPath).joinpath(spectrum.name + `.tab`)
 -                 string = dfToTab(df)
 -                 with open(fileName, `w`) as f:
 -                     f.write(string)
 -                     print(`File saved in: %s.` % fileName)
 -  
 - if __name__ == `__main__`:
 -     runMacro()