如何在SQLAlchemy中编写自己的方言以适应HTTP API?

5

我正在尝试将一个特殊的数据源添加到 Superset(一种数据探索平台)中。这个数据库只支持 HTTP API,并以 JSON 格式返回数据;例如:

> http://localhost/api/sql/query?q="select * from table"
< [{"id": 1, "value":10}, {"id": 2, "value": 30} ...]

因此,我需要为Superset编写自己的python SQLAlchemy适配器。我已经阅读了文档和部分源代码,但仍需要一个好的示例来跟随。
1个回答

5
我解决了这个问题。以下是我所做的事情。
  1. Go to ./site-packages/sqlalchemy/dialects

  2. Copy any concrete dialects to the new one (eg: named zeta) as the start point. A better way is to use

    from sqlalchemy.engine.default import DefaultDialect
    class ZetaDialect(DefaultDialect):
        ...
    
  3. Add zeta into __all__ section of ./site-packages/sqlalchemy/dialects/__init__.py

  4. Create a test program:

    from sqlalchemy import create_engine
    engine = create_engine('zeta://XXX')
    result = engine.execute("select * from table_name")
    for row in result:
        print(row)
  1. 运行程序时出现错误,使用pdb找到原因。在大多数情况下,原因是未实现某些接口。逐一解决。

  2. 当测试程序给出正确答案时,基本完成了90%。为了完整性,我们还应该实现几个检查器使用的接口:

    class ZetaDialect(DefaultDialect):
        # default_paramstyle = 'qmark'
        name = 'zeta'

        def __init__(self, **kwargs):
            DefaultDialect.__init__(self, **kwargs)

        @classmethod
        def dbapi(cls):
            return zeta_dbapi

        @reflection.cache
        def get_table_names(self, connection, schema=None, **kw):
            return [u'table_1', u'table_2', ...]

        @reflection.cache
        def get_pk_constraint(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_foreign_keys(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_unique_constraints(self, connection, table_name,
                                   schema=None, **kw):
            return []

        @reflection.cache
        def get_indexes(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_schema_names(self, connection, **kw):
            return []

        @reflection.cache
        def get_columns(self, connection, table_name, schema=None, **kw):
            # just an example of the column structure
            result = connection.execute('select * from %s limit 1' % table_name)
            return [{'default': None, 'autoincrement': False, 'type': TEXT, 'name': colname, 'nullable': False} for colname, coltype in result.cursor.description]

1
不要使用3,而是使用from sqlalchemy.dialects import registry registry.register("zeta", "myapp.dialect", "ZetaDialect") - Daniel Böckenhoff

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接