使用Python导入Salesforce报告数据

4

我刚开始接触sfdc。用户已经创建了报告,我想使用Python将报告的数据导出到csv/excel文件中。我看到有几个Python包可以实现此功能。但是我的代码出错了。

from simple_salesforce import Salesforce
sf = Salesforce(instance_url='https://cs1.salesforce.com', session_id='')
sf = Salesforce(password='xxxxxx', username='xxxxx', organizationId='xxxxx')

我可以为您提供设置API的基本步骤以及一些示例代码:

4个回答

7
这对我有用:
import requests
import csv
from simple_salesforce import Salesforce
import pandas as pd


sf = Salesforce(username=your_username, password=your_password, security_token = your_token)


login_data = {'username': your_username, 'password': your_password_plus_your_token}


with requests.session() as s:
    d = s.get("https://your_instance.salesforce.com/{}?export=1&enc=UTF-8&xf=csv".format(reportid), headers=sf.headers, cookies={'sid': sf.session_id})

d.content将包含一个逗号分隔值的字符串,您可以使用csv模块读取该字符串。

我从那里将数据带入pandas,因此函数名称和import pandas。 我删除了将数据放入DataFrame的其余部分,但如果您对如何完成此操作感兴趣,请告诉我。


我知道这可能有点老了,但我非常想知道您是如何将d.content转换为数据框/CSV文件的。 - Rukgo
2
@Rukgo:我现在使用https://pypi.python.org/pypi/salesforce-reporting/0.1.3,这是一个专门的包。SFDC从csv查询返回“类似表格”的数据,而pandas(和Excel)足够聪明以适应它,所以我直接将其加载到pandas中。 - Obol
1
@Odol 我研究了一下,但最终选择了 s.get,因为它会在输出中包含头信息。 - Rukgo
1
我认为这种方法已经不再适用了,请尝试这个 https://dev59.com/GLXna4cB1Zd3GeqPRNN_#57585405 - Sho arata

4
如果有帮助的话,我想写出我现在(2018年8月)回答这个问题所用的步骤,基于Obol的评论。 供参考,我按照 README 说明在 https://github.com/cghall/force-retrieve/blob/master/README.md 上使用salesforce_reporting包。
要连接到Salesforce:
from salesforce_reporting import Connection, ReportParser

sf = Connection(username='your_username',password='your_password',security_token='your_token')

然后,为了将我想要的报告转换成Pandas DataFrame:

report = sf.get_report(your_reports_id)
parser = salesforce_reporting.ReportParser(report)
report = parser.records_dict()
report = pd.DataFrame(report)

如果您愿意的话,您还可以将上面的四行内容简化为一行,像这样:
report = pd.DataFrame(salesforce_reporting.ReportParser(sf.get_report(your_reports_id)).records_dict())

我在阅读README时发现一个不同之处,即sf.get_report('report_id', includeDetails=True)引发了错误,指出get_report() got an unexpected keyword argument 'includeDetails'。简单地将其删除似乎导致代码正常工作。

report现在可以通过report.to_csv('report.csv',index=False)导出,或者直接进行操作。

编辑:parser.records()已更改为parser.records_dict(),因为这样允许DataFrame已列出列名,而不是对它们进行数字索引。


1
这份报告/响应是否告诉您是否有超过2000行?或者您不需要担心这个限制,因为您正在获取完整的CSV文件?谢谢。 - tkansara
1
我达到了2k的限制,我猜这适用于这个解决方案。 - malber
1
你知道如何获取超过2k行吗? - Sho arata
有人知道绕过2k限制的解决方案吗? - aero8991

1
下面的代码可能相对较长,只适用于我们的用例,但基本思路如下:找出日期间隔长度和需要的额外过滤器,以避免超过“超过2,000”限制。在我的情况下,我可以有一个每周日期范围过滤器,但需要应用一些额外的过滤器。然后像这样运行它:
report_id = '00O4…'
sf = SalesforceReport(user, pass, token, report_id)
it = sf.iterate_over_dates_and_filters(datetime.date(2020,2,1),
     'Invoice__c.InvoiceDate__c', 'Opportunity.CustomField__c', 
     [('a', 'startswith'), ('b', 'startswith'), …])
for row in it:
  # do something with the dict

迭代器会在自2020-02-01以后每周(如果需要每日或每月的迭代器,则需要更改代码,但更改应该很小)应用过滤器CustomField__c.startswith('a'),然后是CustomField__c.startswith('b')等,并且作为生成器,因此您不需要自己处理过滤器循环。
如果查询返回超过2000行数据,则迭代器会抛出异常,以确保数据不会不完整。
这里有一个警告:SF每小时最多只能进行500个查询。假设您有52周和10个额外的过滤器的一年时间,您已经达到了限制。
以下是类(依赖于simple_salesforce):
import simple_salesforce
import json
import datetime

"""
helper class to iterate over salesforce report data
and manouvering around the 2000 max limit
"""

class SalesforceReport(simple_salesforce.Salesforce):
  def __init__(self, username, password, security_token, report_id):
    super(SalesforceReport, self).__init__(username=username, password=password, security_token=security_token)
    self.report_id = report_id
    self._fetch_describe()

  def _fetch_describe(self):
    url = f'{self.base_url}analytics/reports/{self.report_id}/describe'
    result = self._call_salesforce('GET', url)
    self.filters = dict(result.json()['reportMetadata'])

  def apply_report_filter(self, column, operator, value, replace=True):
    """
    adds/replaces filter, example:
    apply_report_filter('Opportunity.InsertionId__c', 'startsWith', 'hbob').
    For date filters use apply_standard_date_filter.

    column:   needs to correspond to a column in your report, AND the report
              needs to have this filter configured (so in the UI the filter
              can be applied)
    operator: equals, notEqual, lessThan, greaterThan, lessOrEqual,
              greaterOrEqual, contains, notContain, startsWith, includes
              see https://sforce.co/2Tb5SrS for up to date list
    value:    value as a string
    replace:  if set to True, then if there's already a restriction on column
              this restriction will be replaced, otherwise it's added additionally
    """
    filters = self.filters['reportFilters']
    if replace:
      filters = [f for f in filters if not f['column'] == column]
    filters.append(dict(
      column=column, 
      isRunPageEditable=True, 
      operator=operator, 
      value=value))
    self.filters['reportFilters'] = filters

  def apply_standard_date_filter(self, column, startDate, endDate):
    """
    replace date filter. The date filter needs to be available as a filter in the
    UI already

    Example: apply_standard_date_filter('Invoice__c.InvoiceDate__c', d_from, d_to)

    column: needs to correspond to a column in your report
    startDate, endDate: instance of datetime.date
    """
    self.filters['standardDateFilter'] = dict(
      column=column,
      durationValue='CUSTOM',
      startDate=startDate.strftime('%Y-%m-%d'),
      endDate=endDate.strftime('%Y-%m-%d')
    )

  def query_report(self):
    """
    return generator which yields one report row as dict at a time
    """
    url = self.base_url + f"analytics/reports/query"
    result = self._call_salesforce('POST', url, data=json.dumps(dict(reportMetadata=self.filters)))
    r = result.json()
    columns = r['reportMetadata']['detailColumns']
    if not r['allData']:
      raise Exception('got more than 2000 rows! Quitting as data would be incomplete')
    for row in r['factMap']['T!T']['rows']:
      values = []
      for c in row['dataCells']:
        t = type(c['value'])
        if t == str or t == type(None) or t == int:
          values.append(c['value'])
        elif t == dict and 'amount' in c['value']:
          values.append(c['value']['amount'])
        else:
          print(f"don't know how to handle {c}")
          values.append(c['value'])
      yield dict(zip(columns, values))

  def iterate_over_dates_and_filters(self, startDate, date_column, filter_column, filter_tuples):
    """
    return generator which iterates over every week and applies the filters 
    each for column
    """
    date_runner = startDate
    while True:
      print(date_runner)
      self.apply_standard_date_filter(date_column, date_runner, date_runner + datetime.timedelta(days=6))
      for val, op in filter_tuples:
        print(val)
        self.apply_report_filter(filter_column, op, val)
        for row in self.query_report():
          yield row
      date_runner += datetime.timedelta(days=7)
      if date_runner > datetime.date.today():
        break

谢谢您发布这个问题!您知道如何突破2k的限制吗? - aero8991

0

对于任何想要将报告下载到DataFrame中的人,以下是操作步骤(我添加了一些注释和链接以进行说明):

import pandas as pd
import csv
import requests
from io import StringIO
from simple_salesforce import Salesforce

# Input Salesforce credentials:
sf = Salesforce(
    username='johndoe@mail.com', 
    password='<password>', 
    security_token='<security_token>') # See below for help with finding token 

# Basic report URL structure:
orgParams = 'https://<INSERT_YOUR_COMPANY_NAME_HERE>.my.salesforce.com/' # you can see this in your Salesforce URL
exportParams = '?isdtp=p1&export=1&enc=UTF-8&xf=csv'

# Downloading the report:
reportId = 'reportId' # You find this in the URL of the report in question between "Report/" and "/view"
reportUrl = orgParams + reportId + exportParams
reportReq = requests.get(reportUrl, headers=sf.headers, cookies={'sid': sf.session_id})
reportData = reportReq.content.decode('utf-8')
reportDf = pd.read_csv(StringIO(reportData))

您可以按照此页面底部的说明获取您的令牌。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接