-
Notifications
You must be signed in to change notification settings - Fork 0
/
tes_local.py
43 lines (32 loc) · 951 Bytes
/
tes_local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyspark.sql import SparkSession
import os
# 本地设置
os.environ["PYSPARK_PYTHON"] = "D://env//anaconda3//python.exe"
os.environ["PYSPARK_DRIVER_PYTHON"] = "D://env//anaconda3//python.exe"
spark = SparkSession \
.builder \
.appName("tes") \
.master('local[*]') \
.enableHiveSupport() \
.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
sc = spark.sparkContext
# sc.setLogLevel('INFO')
sc.setLogLevel('ERROR')
spark_df = spark.createDataFrame([
({"1": 1}, {"1": "1"}, "1", '20220606'),
({"2": 2}, {"2": "2"}, "2", '20220606'),
({"3": 3}, {"3": "3"}, "3", '20220606')
],
['a', 'b', 'c', 'dt']
)
spark_df.show()
def process_func(partitions):
res = []
for row in partitions:
print(row.asDict())
print(row['dt'])
res.append(row['dt'])
yield res
data = spark_df.rdd.repartition(4).mapPartitions(lambda x: process_func(x)).collect()
print(data)