Python tnsnames.ora解析器

4

我需要一个包含所有tnsnames.ora文件中的数据库连接的字典。

我需要将其转换为:

(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))

到这个:

{'DESCRIPTION': [{'ADDRESS_LIST': [{'ADDRESS': [{'PROTOCOL': 'TCP'},
                                               {'HOST': 'mydbserver.mydomain.com'},
                                               {'PORT': '1521'}
                                               ]
                                   }]
                  },
                  {'CONNECT_DATA': [{'SID': 'CATAL'},
                                    {'SERVER': 'DEDICATED'},
                                    {'SERVICE_NAME': 'mydb.mydomain.com'}
                                   ]
                  }
                ]
}

到目前为止,我的代码如下:
def get_param(param_string):
    print("get_param input:", param_string)
    if param_string.count("(") != param_string.count(")"):
        raise Exception("Number of '(' is not egal to number of ')' : " + str(param_string.count("(")) + " and " + str(param_string.count(")")))
    else:
        param_string = param_string[1:-1]
        splitted     = param_string.split("=")
        keywork      = splitted[0]
        if len(splitted) == 2:
            return {keywork: splitted[1]}
        else:
            splitted.remove(keywork)
            values       = "=".join(splitted)
            return {keywork: get_value_list(values)}

def get_value_list(value_string):
    print("get_value_list input:", value_string)
    to_return = list()
    if "=" not in value_string and "(" not in value_string and ")" not in value_string:
        to_return.append(value_string)
    elif value_string[0] != "(":
        raise Exception("[ERROR] Format error '(' is not the first char: " + repr(value_string))
    else:
        parenth_count = 0
        strlen        = len(value_string)
        current_value = ""
        for i in range(0,strlen):
            current_char = value_string[i]
            current_value += current_char
            if current_char == "(":
                parenth_count += 1
            elif current_char == ")":
                parenth_count += -1
                if parenth_count == 0:
                    to_return.append(get_param(current_value))
                    if i != (strlen - 1):
                        if value_string[i+1] == "(":
                           to_return += get_value_list(value_string[i+1:])
                        else:
                            raise Exception("Format error - Next char should be a '('. value_string[i+1]:" + repr(value_string[i+1]) )
                    break
    print("get_value_list return:", to_return)
    if len(to_return) == 0:
        to_return = ""
    elif len(to_return) == 1:
        to_return = to_return[0]
    return to_return

connection_infos   = "(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))"
current_connection = get_param(connection_infos)
print("current_connection:", current_connection)
pprint(current_connection)

我得到了这个:
{'DESCRIPTION': [{'ADDRESS_LIST': {'ADDRESS': [{'PROTOCOL': 'TCP'},
                                               {'HOST': 'mydbserver.mydomain.com'},
                                                'PORT']
                                   }
                  },
                  'CONNECT_DATA'
                ]
}

所以我做错了什么。而且我感觉自己在做一些太复杂的事情。有人能指出我犯的错误或帮助我找到一个更简单的方法来做这个吗?


在我看来,你可以下载针对tnsnames.ora的ANTLR语法,然后让ANTLR为你生成解析器源代码。 - ibre5041
此外,看起来你的解析器不支持“include”子句。 - ibre5041
请查看这个Oracle页面:掌握Oracle+Python,第3部分:数据解析。查找“考虑解析tnsnames.ora文件”。 - Baumann
4个回答

1
你可以使用 pyparsing 来实现这个功能:
import pyparsing as pp

# 1. Literals
VAR = pp.Word(pp.alphas + "_", pp.alphanums + "_").setName('variable')
SPACE = pp.Suppress(pp.Optional(pp.White()))
EQUALS = SPACE + pp.Suppress('=') + SPACE
OPEN = pp.Suppress('(') + SPACE
CLOSE = pp.Suppress(')') + SPACE

INTEGER = pp.Optional('-') + pp.Word(pp.nums) + ~pp.Char(".")
INTEGER.setParseAction(lambda t: int(t[0]))

FLOAT = pp.Optional('-') + pp.Word(pp.nums) + pp.Char('.') + pp.Optional(pp.Word(pp.nums))
FLOAT.setParseAction(lambda t: float(t[0]))

STRING = pp.Word(pp.alphanums + r'_.-')

# 2. Literal assignment expressions: (IDENTIFIER = VALUE)
INTEGER_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + INTEGER + CLOSE)
FLOAT_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + FLOAT + CLOSE)
STRING_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + STRING + CLOSE)

# 3. Nested object assignment
ASSIGNMENT = pp.Forward()
NESTED_ASSIGNMENT = pp.Group(OPEN + VAR + EQUALS + ASSIGNMENT + CLOSE)
ASSIGNMENT << pp.OneOrMore(INTEGER_ASSIGNMENT |
                           FLOAT_ASSIGNMENT |
                           STRING_ASSIGNMENT |
                           NESTED_ASSIGNMENT)

# 4. Net service name(s): NAME(.DOMAIN)[, NAME(.DOMAIN)...]
NET_SERVICE_NAME = pp.OneOrMore(pp.Word(pp.alphas + '_' + '.', pp.alphanums + '_' + '.')
                                + pp.Optional(pp.Suppress(',')))

# 5. Full TNS entry
TNS_ENTRY = NET_SERVICE_NAME + EQUALS + ASSIGNMENT

这里是一些示例数据:

TNS_NAMES_ORA = """
MYDB =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = server01)(PORT = 25881))
    )
    (CONNECT_DATA =
      (SID = MYDB01)
    )
  )

OTHERDB.DOMAIN, ALIAS_FOR_OTHERDB.DOMAIN = 
  (DESCRIPTION_LIST =
    (DESCRIPTION =
      (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)
                                 (HOST = server02)
                                 (PORT = 25881)
      ))
      (CONNECT_DATA = (SID = MYDB02))
    )
  )
"""

这部分有点特殊,但这是一个将一些额外的解析放在其上以提取所有数据的示例:

def _parse_addresses(tns_entry, addresses):
    """
    Parse ADDRESS keywords from the a TNS entry
    :param definition: Unparsed part of the TNS entry
    :param addresses: List of addresses parsed
    """
    keyword = tns_entry[0]
    # Base Case: We found an ADDRESS, so extract the data 
    # and do not recurse into it
    if keyword.upper() == 'ADDRESS':
        port = None
        host = None
        for k, v in tns_entry[1:]:
            if k == 'PORT':
                port = v
            elif k == 'HOST':
                host = v
        if port is None:
            print('WARNING: Ignoring ADDRESS due to missing PORT')
        elif host is None:
            print('WARNING: Ignoring ADDRESS due to missing HOST')
        addresses.append({'host': host, 'port': port})
    # Else recursively descend through the definition
    for d in tns_entry[1:]:
        # Only parse sub-lists, not literals
        if isinstance(d, list):
            _parse_addresses(d, addresses)

def _parse_connect_data(tns_entry, sids):
    """
    Parse CONNECT_DATA keywords from the a TNS entry
    :param definition: Unparsed part of the TNS entry
    :param sids: List of Oracle SIDs
    """
    keyword = tns_entry[0]
    # Base Case: We found a CONNECT_DATA, so extract the data 
    # and do not recurse into it
    if keyword.upper() == 'CONNECT_DATA':
        sid = None
        for k, v in tns_entry[1:]:
            if k == 'SID':
                sid = v
        if sid is None:
            print('WARNING: Ignoring CONNECT_DATA due to missing SID')
        sids.append(sid)
    for d in tns_entry[1:]:
        # Only parse sub-lists, not literals
        if isinstance(d, list):
            _parse_connect_data(d, sids)
        
def get_connection_info(net_service_name: str, tns_string: str):
    """
    Generator over all simple connections inferred from a TNS entry
    :param net_service_name: Net service name to return connection info for
    :param tns_string: tnsnames.ora file contents
    """
    # Parse the TNS entries and keep the requested definition
    definition = None
    for tokens, _start, _end in TNS_ENTRY.scanString(tns_string):
        if net_service_name in tokens.asList()[0]:
            definition = tokens.asList()[1]
            break
    # Check if we found a definition
    if definition is None:
        raise KeyError(f'No net service named {net_service_name}')
    # Look for all the ADDRESS keywords
    addresses = []
    _parse_addresses(definition, addresses)
    # Look for all CONNECT_DATA keywords
    sids = []
    _parse_connect_data(definition, sids)
    # Emit all combinations
    for address in addresses:
        for sid in sids:
            yield {'sid': sid, **address}

# Try it out!
for connection_info in get_connection_info('MYDB', TNS_NAMES_ORA):
    print(connection_info)

我在这里写了一篇关于编程方面的博客文章,只是出于“娱乐”: https://unparameterized.blogspot.com/2021/02/parsing-oracle-tns-files-in-python.html

1
我现在有一个工作的代码,但我并不满意。它太长了,不够灵活,并且无法与其他可能的 tnsnames.ora 格式配合使用。
class Tnsnames():
    def __init__(self, file_path, file_name='tnsnames.ora'):
        self.file_path  = file_path
        self.file_name  = file_name
        self.load_file()
    def load_file(self):
        try:
            fhd = open(os.path.join(self.file_path, self.file_name), 'rt', encoding='utf-8')
        except:
            raise
        else:
            #Oracle doc : https://docs.oracle.com/cd/B28359_01/network.111/b28317/tnsnames.htm#NETRF007
            file_content      = list()
            for l in fhd:
                l = " ".join(l.split()).strip(" \n")
                if len(l) > 0:
                    if "#" not in l:
                        file_content.append(l)
            fhd.close()
            file_content           = " ".join(file_content)
            connections_list       = dict()
            current_depth          = 0
            current_word           = ""
            current_keyword        = ""
            name_to_register       = ""
            is_in_add_list         = False
            current_addr           = dict()
            connections_aliases    = dict()
            stop_registering       = False
            connections_duplicates = list()
            for c in file_content:
                if c == " ":
                    pass
                elif c == "=":
                    current_keyword = str(current_word)
                    current_word    = ""
                    if current_keyword == "ADDRESS_LIST":
                        is_in_add_list = True
                elif c == "(":
                    if current_depth == 0:
                        current_keyword = current_keyword.upper()
                        names_list      = current_keyword.replace(" ","").split(",")
                        if len(names_list) == 1:
                            name_to_register = names_list[0]
                        else:
                            name_to_register = None
                            # We use either the first name with at least 
                            # a dot in it, or the longest one.
                            for n in names_list:
                                if "." in n:
                                    name_to_register = n
                                    break
                            else:
                                name_to_register = max(names_list, key=len)
                            names_list.remove(name_to_register)
                            for n in names_list:
                                if n in connections_aliases.keys():
                                    print("[ERROR] already registered alias:", n, 
                                          ". Registered to:", connections_aliases[n], 
                                          ". New:", name_to_register, 
                                          ". This possible duplicate will not be registered.")
                                    connections_duplicates.append(n) 
                                    stop_registering = True
                                else:
                                    connections_aliases[n] = name_to_register
                        if not stop_registering:
                            connections_list[name_to_register] = {"ADDRESS_LIST": list(), 
                                                                  "CONNECT_DATA": dict(),
                                                                  "LAST_TEST_TS": None}
                        current_depth += 1
                    elif current_depth in [1,2,3]:
                        current_depth += 1
                    else:
                        print("[ERROR] Incorrect depth:", repr(current_depth), ". Current connection will not be registered" )
                        del connections_list[name_to_register]
                        stop_registering = True
                elif c == ")":
                    if current_depth == 1:
                        if stop_registering:
                            stop_registering = False
                        else:
                            # Before moving to next connection,
                            # we check that current connection 
                            # have at least a HOST, and a SID or
                            # SERVICE_NAME
                            connection_is_valid = True
                            if isinstance(connections_list[name_to_register]["ADDRESS_LIST"], dict):
                                if "HOST" not in connections_list[name_to_register]["ADDRESS_LIST"].keys():
                                    print("[ERROR] Only one address defined, and no HOST defined. Current connection will not be registered:", name_to_register)
                                    connection_is_valid = False
                            elif isinstance(connections_list[name_to_register]["ADDRESS_LIST"], list):
                                for current_address in connections_list[name_to_register]["ADDRESS_LIST"]:
                                    if "HOST" in current_address.keys():
                                        break
                                else:
                                    print("[ERROR] Multiple addresses but none with HOST. Current connection will not be registered:", name_to_register)
                                    connection_is_valid = False
                            else:
                                print("[ERROR] Incorrect address format:", connections_list[name_to_register]["ADDRESS_LIST"], " Connection:", name_to_register)
                                connection_is_valid = False
                            if not connection_is_valid:
                                del connections_list[name_to_register]
                            else:
                                if "SERVICE_NAME" not in connections_list[name_to_register]["CONNECT_DATA"].keys() and \
                                   "SID" not in connections_list[name_to_register]["CONNECT_DATA"].keys():
                                    print("[ERROR] Missing SERVICE_NAME / SID for connection:", name_to_register)
                                    del connections_list[name_to_register]
                    elif current_depth == 2:
                        if is_in_add_list:
                            is_in_add_list = False
                            if not stop_registering:
                                if len(connections_list[name_to_register]["ADDRESS_LIST"]) == 1:
                                    connections_list[name_to_register]["ADDRESS_LIST"] = connections_list[name_to_register]["ADDRESS_LIST"][0]
                    elif current_depth == 3:
                        if is_in_add_list:
                            if not stop_registering:
                                connections_list[name_to_register]["ADDRESS_LIST"].append(current_addr)
                            current_addr   = dict()
                        elif current_keyword.upper() in ["SID", "SERVER", "SERVICE_NAME"]:
                            if not stop_registering:
                                connections_list[name_to_register]["CONNECT_DATA"][current_keyword.upper()] = current_word.upper()
                    elif current_depth == 4:
                        if is_in_add_list:
                            if not stop_registering:
                                current_addr[current_keyword.upper()] = current_word.upper()
                    current_keyword = ""
                    current_word    = ""
                    current_depth  += -1
                else:
                    current_word += c
        self.connections = connections_list
        self.aliases     = connections_aliases
        self.duplicates  = connections_duplicates

测试tnsnames.ora:

########################################
# This is a sample tnsnames.ora        #
########################################


###################################################
#  PRODDB
###################################################

proddb.mydbs.domain.com, PRODDB =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb1.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb2.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb3.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb4.mydbs.domain.com)(PORT = 1522))
    )
    (CONNECT_DATA =
      (SID = PRODDB)
      (SERVER = DEDICATED)
      (SERVICE_NAME = proddb.mydbs.domain.com)
    )
  )

###################################################
#  DEVDBA : Test database for DBA usage 
###################################################

devdba.mydbs.domain.com, DEVDBA =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = devdba.mydbs.domain.com)(PORT = 1521))
    )
    (CONNECT_DATA =
      (SID = DEVDBA)
    )
  )

测试代码:

from pprint       import pprint
from lib_database import Tnsnames

tnsnnames = Tnsnames('/usr/lib/oracle/12.2/client64/network/admin')
print('Connexions:')
pprint(tnsnnames.connections)
print('Aliases:')
pprint(tnsnnames.aliases)
print('Duplicates:')
pprint(tnsnnames.duplicates)

输出:
Connexions:
    {'DEVDBA.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': {'HOST': 'DEVDBA.MYDBS.DOMAIN.COM',
                                                  'PORT': '1521',
                                                  'PROTOCOL': 'TCP'},
                                 'CONNECT_DATA': {'SID': 'DEVDBA'},
     'PRODDB.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': [{'HOST': 'PRODDB1.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB2.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB3.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB4.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'}],
                                 'CONNECT_DATA': {'SERVER': 'DEDICATED',
                                                  'SERVICE_NAME': 'PRODDB.MYDBS.DOMAIN.COM',
                                                  'SID': 'PRODDB'}}
Aliases:
    {'DEVDBA': 'DEVDBA.MYDBS.DOMAIN.COM', 'PRODDB': 'PRODDB.MYDBS.DOMAIN.COM'}
Duplicates:
    []

我找不到其他用于解析 tnsnames.ora 文件的 Python 解析器。如果您知道其他解析器,请指引给我。


0
import re

def find_match(tns_regex, y):
    x1 = re.match(tns_regex, y, re.M + re.I + re.S)
    if x1 is not None:
        x1 = x1.groups(1)[0] # Only first match is returned
        x1 = x1.strip('\n')
    return(x1)


# Removing commented text
with open("C:\\Oracle\\product\\11.2.0\\client_1\\network\\admin\\tnsnames.ora") as tns_file:
    with open("test_tns.ora", 'w+') as output:
        lines =tns_file.readlines()
        for line in lines:
            if not line.startswith('#'):
                output.write(line)


with open('test_tns.ora') as tns_file:
    tnsnames = tns_file.read()


tnsnames1 = re.split(r"\){3,}\n\n", tnsnames)

# Regex matches
tns_name = '^(.+?)\s?\=\s+\(DESCRIPTION.*'
tns_host = '.*?HOST\s?=\s?(.+?)\)'
tns_port = '.*?PORT\s?=\s?(\d+?)\)'
tns_sname = '.*?SERVICE_NAME\s?=\s?(.+?)\)'
tns_sid = '.*?SID\s?=\s?(.+?)\)'

easy_connects = []

for y in tnsnames1:
    y = '%s))' % y

    l = [find_match(x, y) for x in [tns_name, tns_host, tns_port, tns_sname, tns_sid]]

    d = {
        'name': l[0],
        'host': l[1],
        'port': l[2],
        'service_name': l[3],
        'sid': l[4]
    }

    easy_connects.append(d)


print(easy_connects)

0

我写了这段小代码。它可以解析tnsnames.ora文件。速度很快,而且效果非常好。

def parse_ora_tns_file(fpath,tnskey=None,return_all_keys=False,view_file=False,logger=None)->str:
 
    """
    This function parse oracle tns file
    parms: fpath : full file path like
    fpath=filepath\tnsnames.ora
    param: tnskey: find tns entry for given tns key like tnskey='ABC.WORLD'
    param: return_all_keys: if True it will return all tns key names
    param: view_file : if True it returns tnsnames.ora as str
    """
    clean_tns_file=''
    if logger:
        logger.info('Reading tnsnames ora file at {} ...'.format(fpath))
    with open(fpath,mode='r') as tns_file:
 
        lines =tns_file.readlines()
        for line in lines:
            if not line.startswith('#'):
 
                clean_tns_file=clean_tns_file+line
 
 
 
 
 
    #clean file
    clean_str = clean_tns_file.replace('\n','')
    clean_str = clean_str.replace('\t','')
 
    #replace = with ' ' so later I can split with ' '
    #with below it becomes like ABC.WORLD = (DESCRIPTION) to ABC.WORLD ' ' (DESCRIPTION)
    clean_str = clean_str.replace('=',' ')
 
    #Below one output to ['ABC.WORLD','',' (DESCRIPTION)']
    lstresult= clean_str.split(" ")
    #Below code remove extra space from char list like it becomes ['ABC.WORLD','','(DESCRIPTION)']
    lstresult = [txt.strip() for txt in lstresult]
 
    #Below code to replace () chars to '' from list so output as ['ABC.WORLD','','DESCRIPTION']
    removetable = str.maketrans('', '', '()')
    out_list = [s.translate(removetable) for s in lstresult]
 
    #Below code remove any empty list items so output as ['ABC.WORLD','DESCRIPTION']
    out_list = list(filter(None, out_list))
 
 
 
    #find index of DESCRIPTION words
    indices = [i for i, x in enumerate(out_list ) if x.upper() == "DESCRIPTION"]
    tns_keys= ""
 
    for i in indices:
        #use index of DESCRIPTION to tns keys which is required for regx pattern below.
        tns_keys=tns_keys+out_list[i-1]+"|"
         
 
    if return_all_keys:
        return tns_keys.replace('|',',')[:-1]
 
    if logger:
        logger.info('Keys found in tnsnames ora: {}'.format(tns_keys))
    regex = r"\s+(?!^({}))".format(tns_keys)
    result = re.sub(regex, '', clean_tns_file, 0, re.MULTILINE)
 
    if view_file:
        return result
    if result:
        for match in re.finditer(r'^((?:{}))(.*)'.format(tns_keys), result, re.MULTILINE):
 
            if match.group(1) == tnskey:
                 
                 
                #removing = sign from start of entry
                if logger:
                    logger.info('Found tns entry: {} {}'.format(match.group(1),match.group(2)))
                return match.group(2)[1:]
 
    if logger:
        logger.info('No tns entry found for {}'.format(tnskey))
    return None

虽然这段代码可能回答了问题,但提供有关它如何以及/或为什么解决问题的附加上下文将改善答案的长期价值。 - Tomer Shetah
欢迎来到SO!请参观[tour]并阅读[answer]。然后根据该评论中相关的信息更新您的答案。 - Tomer Shetah
这个问题没有提供上下文说明为什么需要将TNS条目转换为字典。 - Sunil Bhambhani
例如,您可以详细说明原始代码为什么无法工作,以及您的代码与原始代码有何不同之处,使其能够正常工作。 - Tomer Shetah

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接