从NetCDF文件中加载数据到PostgreSQL数据库

3

我有一个包含八个变量的netCDF文件。(抱歉,无法分享实际文件) 每个变量都有两个维度:时间和站点。时间步长约为14步,而站点目前有38000个不同的ID。 因此,对于38000个不同的“位置”(实际上只是一个ID),我们有8个变量和14个不同的时间。

$ncdump -h stationdata.nc
netcdf stationdata {
dimensions:
    station = 38000 ;
    name_strlen = 40 ;
    time = UNLIMITED ; // (14 currently)
variables:
    int time(time) ;
            time:long_name = "time" ;
            time:units = "seconds since 1970-01-01" ;
    char station_name(station, name_strlen) ;
            station_name:long_name = "station_name" ;
            station_name:cf_role = "timeseries_id" ;
    float var1(time, station) ;
            var1:long_name = "Variable 1" ;
            var1:units = "m3/s" ;
    float var2(time, station) ;
            var2:long_name = "Variable 2" ;
            var2:units = "m3/s" ;
...

这些数据需要被加载到PostGres数据库中,以便将这些数据和站点名称匹配的几何图形连接起来,为后续的可视化做准备。

目前,我使用netCDF4模块在Python中完成了这个过程。虽然工作正常,但是很慢!现在我正在通过如下循环操作:

times = rootgrp.variables['time']
stations = rootgrp.variables['station_name']
for timeindex, time in enumerate(times):
    stations = rootgrp.variables['station_name']
    for stationindex, stationnamearr in enumerate(stations):
        var1val = var1[timeindex][stationindex]
        print "INSERT INTO ncdata (validtime, stationname, var1) \
            VALUES ('%s','%s', %s);" % \
            ( time, stationnamearr, var1val )

在我的机器上,运行这个需要几分钟的时间,我觉得它可以用更聪明的方式完成。

有没有人知道如何用更巧妙的方式实现?最好使用Python。


netCDF(.nc)是一种地理空间文件格式。对于任何搜索.nc文件结尾的人来说,这是一个很好的选择。 - questionto42
3个回答

3

我不确定这是否是正确的方法,但我找到了一个很好的解决方法,并认为我应该分享一下。

在第一个版本中,脚本运行大约需要一个小时。重写代码后,现在只需不到30秒就可以运行!

关键是使用numpy数组并将NetCDF读取器的变量数组转置为行,然后将所有列堆叠到一个矩阵中。然后使用psycopg2 copy_from函数将此矩阵加载到db中。我从这个问题中得到了代码:

使用二进制COPY表格从psycopg2

我的代码的部分内容:

dates = num2date(rootgrp.variables['time'][:],units=rootgrp.variables['time'].units)
var1=rootgrp.variables['var1']
var2=rootgrp.variables['var2']

cpy = cStringIO.StringIO()

for timeindex, time in enumerate(dates):

    validtimes=np.empty(var1[timeindex].size, dtype="object")
    validtimes.fill(time)

    #  Transponse and stack the arrays of parameters
    #    [a,a,a,a]        [[a,b,c],
    #    [b,b,b,b]  =>     [a,b,c],
    #    [c,c,c,c]         [a,b,c],
    #                      [a,b,c]]

    a = np.hstack((
              validtimes.reshape(validtimes.size,1),
              stationnames.reshape(stationnames.size,1),
              var1[timeindex].reshape(var1[timeindex].size,1),
              var2[timeindex].reshape(var2[timeindex].size,1)
    ))

    # Fill the cStringIO with text representation of the created array
    for row in a:
            cpy.write(row[0].strftime("%Y-%m-%d %H:%M")+'\t'+ row[1] +'\t' + '\t'.join([str(x) for x in row[2:]]) + '\n')


conn = psycopg2.connect("host=postgresserver dbname=nc user=user password=passwd")
curs = conn.cursor()

cpy.seek(0)
curs.copy_from(cpy, 'ncdata', columns=('validtime', 'stationname', 'var1', 'var2'))
conn.commit()

2
有一些简单的改进可以加速此过程。它们都是独立的,您可以尝试所有这些方法,也可以只尝试其中几个来查看是否足够快。以下是大致按难度递增的方法:
  • 使用 psycopg2 数据库驱动程序,它更快。
  • 将整个插入块包装在一个事务中。如果您正在使用 psycopg2,那么已经在自动打开一个事务,您只需要在最后进行commit即可。
  • 收集数行值到数组中,并在每 n 行执行多值插入。
  • 使用多个连接通过辅助进程进行插入 - 参见 multiprocessing 模块。线程由于全局解释器锁(GIL)问题而无法很好地工作。

如果您不想使用一个大型事务,您可以设置 synchronous_commit = off 并设置一个 commit_delay,以便连接可以在磁盘刷新实际完成之前返回。如果您在一个事务中完成所有工作,这对您没有太大帮助。

多值插入

Psycopg2 不直接支持多值 INSERT,但您可以编写以下代码:

curs.execute("""
INSERT INTO blah(a,b) VALUES
(%s,%s),
(%s,%s),
(%s,%s),
(%s,%s),
(%s,%s);
""", parms);

可以使用类似以下的循环:

parms = []
rownum = 0
for x in input_data:
    parms.extend([x.firstvalue, x.secondvalue])
    rownum += 1
    if rownum % 5 == 0:
        curs.execute("""INSERT ...""", tuple(parms))
        del(parms[:])

2

将循环组织起来,每次访问所有变量。换句话说,一次读写一个记录而不是一个变量。这样可以极大地提高速度,特别是如果源netCDF数据集存储在具有大磁盘块(例如1MB或更大)的文件系统上。关于为什么这样更快以及讨论数量级的加速效果,请参见此NCO加速讨论,从第7个条目开始。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接