我有一个pyspark作业,本地运行时没有任何问题,但是当它在AWS集群上运行时,它在到达以下代码时就会卡住。该作业只处理100条记录。 "some_function"将数据发布到网站并在最后返回响应。有什么想法出了什么问题或者我该如何调试?另外,"Some_function"在类的外部,我猜问题与["闭包"][1]有关,但不确定如何解决。
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
完整代码如下
def ctgs(entries):
col1 = entries[0]
col2 = entries[1]
col3 = entries[2]
rec = {
up_col1 : col1,
up_col2 : col2,
up_col3 : col3
}
return rec
def some_func1(rec, dict_name, id):
recs{
rec_list = list(rec)
seid = id
}
headers = "some header"
attrburl = "www.someurl.com"
response = requests.post(attrburl, data=json.dumps(rec_list)), headers)
return response
class Processor:
def __init(self, sc, arguments):
self.sc = sc
self.env = arguments.env
self.dte = arguments.dte
self.sendme = arguments.sendme
def send_them(ext_data, dict_name,id):
attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
def extr_data(self):
ext_data=spark.sql('''select col1, col2, col3 from table_name''')
send_them(ext_data,dict_name,id)
def process(self):
dict_name = { dict_id: '34343-3434-3433-343'}
id = 'dfdfd-erere-dfd'
extr_data()
def argument_parsing(args):
parser.add_argument("--env", required=True)
parser.add_argument("--dte", required=True)
parser.add_argument("--sendme", required=False)
args = parser.parse_args(args)
return args
def main(args):
arguments = argument_parsing(args)
sc = SparkSession \
.builder \
.appName("job_name") \
.enableHiveSupport() \
.getOrCreate()
sc.sparkContext.setLogLevel("ERROR")
processor = Processor(sc, arguments)
processor.process()
requests.post()
调用放入“try-except”块中,并手动检查是否可以从AWS集群访问attribute_url
或某个防火墙规则阻止连接。 - zweack