需求:
- 上传约15MB的csv文件,然后使用
bulk_create
方法。 - 将一个任务的
id
分成10万条记录。 - 下一轮将删除现有的记录并再次使用
INSERT INTO
方法。
我的猜测:
我怀疑sequence_id
溢出是问题的根本原因,
因为之前它曾经能够上传,但现在失效了,无法再次上传。
这是我的postgres日志:
2017-06-23 04:55:21.087 UTC [27896] LOG: server process (PID 20529) was terminated by signal 9: Killed
2017-06-23 04:55:21.087 UTC [27896] DETAIL: Failed process was running: INSERT INTO "sales_sales" ("imc", "order_number", "original_order_date", "count") VALUES ('1049129', '415000458', '2017-03-01T03:00:00+00:00'::timestamptz, 1), ('1113804', '415000457', '2017-03-01T03:00:00+00:00'::timestamptz, 1), ('1151620', '415000460', '2017-03-01T03:00:00+00:00'::timestamptz, 1), ('1522771', '415000462', '2017-03-01T03:00:00+00:00'::timestamptz, 1), ('2280038', '415000459', '2017-03-01T03:00:00+00:00'::timestamptz, 1), ('7374979', '415000461', '2017-03-01T03:00:00+00:00'::timestamptz, 1), ('399428', '415000618', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('399428', '415000619', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('1049129', '415000614', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('1059455', '415000636', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('1059455', '415000638', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('1075963', '415000605', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('1113804', '415000607', '2017-03-01T03:02:00+00:00'::timestamptz, 1), ('1137600', '
2017-06-23 04:55:21.090 UTC [27896] LOG: terminating any other active server processes
2017-06-23 04:55:21.100 UTC [19656] WARNING: terminating connection because of crash of another server process
2017-06-23 04:55:21.100 UTC [19656] DETAIL: The postmaster has commanded this server process to roll back the current transaction and exit, because another server process exited abnormally and possibly corrupted shared memory.
2017-06-23 04:55:21.100 UTC [19656] HINT: In a moment you should be able to reconnect to the database and repeat your command.
2017-06-23 04:55:21.134 UTC [27896] LOG: all server processes terminated; reinitializing
2017-06-23 04:55:21.183 UTC [20539] LOG: database system was interrupted; last known up at 2017-06-23 04:51:40 UTC
2017-06-23 04:55:21.202 UTC [20540] postgres@uihspot FATAL: the database system is in recovery mode
2017-06-23 04:55:21.211 UTC [20541] postgres@uihspot FATAL: the database system is in recovery mode
更新我的情况,涉及到COPY问题
解决方案:
pip install django-postgres-copy
@transaction.atomic
def postgres_copy(instance: UploadedFile):
"""
Use COPY to do bulk INSERT INTO
:param instance:
:return:
"""
import time # PyCharm Bug 30May2017 It optimized and removed my line
start_time = time.time()
bkk = timezone(settings.TIME_ZONE)
urllib.request.urlretrieve(instance.file.url, "original.csv")
Sales.objects.all().delete()
with open("original.csv", 'rb') as source_file:
with open("utf8.tsv", 'w+b') as dest_file:
contents = source_file.read()
dest_file.write(contents.decode('utf-16').encode('utf-8'))
in_txt = csv.reader(open('./utf8.tsv', "r"), delimiter='\t')
out_csv = csv.writer(open('./utf8.csv', 'w'))
out_csv.writerows(in_txt)
sales = []
copy_mapping = CopyMapping(
Sales,
"./utf8.csv",
dict(
imc='IMC Number',
order_number='Order Number',
original_order_date='Original Order Date',
count='Demand Order Count'
)
)
copy_mapping.save()
result = time.time() - start_time
logger.info(msg=f"Total Execution save_sale_records time --- {result} seconds ---")
以及原始的版本
@transaction.atomic
def save_sale_records(instance: UploadedFile):
"""
This file will download from minio. Since TemporaryUploadedFile class is not a File class
Therefore it is not supported by csv reader. Python function read from real object
:param instance:
:return:
"""
import time # PyCharm Bug 30May2017 It opmized and removed my line
start_time = time.time()
bkk = timezone(settings.TIME_ZONE)
urllib.request.urlretrieve(instance.file.url, "original.csv")
Sales.objects.all().delete()
with open("original.csv", 'rb') as source_file:
with open("utf8.csv", 'w+b') as dest_file:
contents = source_file.read()
dest_file.write(contents.decode('utf-16').encode('utf-8'))
sales = []
with open("utf8.csv") as csv_file:
reader = csv.reader(csv_file, dialect="excel-tab")
for index, row in enumerate(reader):
"""
OrderedDict([
('\ufeffWarehouse Code', '41CL'),
('Warehouse Desc', 'แอมเวย์ ช็อป สีลม'),
('IMC Number', '1113804'),
('Order Number', '415000457'),
('Original Order Date', '2017-03-01 00:00:00'),
('Order 24 Hour Min', '09:42'),
('Demand Order Count', '1')])
"""
if index == 0:
continue
# Multiple lines for maintainer
order_date = row[4].split(" ")[0]
order_time = row[5]
order_datetime = order_date + "-" + order_time
date_obj = datetime.strptime(order_datetime, "%m/%d/%y-%H:%M").replace(tzinfo=bkk)
utc_date = date_obj.astimezone(pytz.utc)
sale = Sales(
imc=row[2],
order_number=row[3],
original_order_date=utc_date,
count=row[6]
)
sales.append(sale)
Sales.objects.bulk_create(sales)
result = time.time() - start_time
logger.info(msg=f"Total Execution save_sale_records time --- {result} seconds ---")