Чтобы заменить «слова, разделенные пробелами» в список слов, вам необходимо заменить:
lines1 = new.map(lambda x: (x, ))
с участием
lines1 = new.map(lambda line: line.split(' '))
Я попробовал это на своей машине, и после выполнения следующих
df = sqlContext.createDataFrame(lines1)
Был создан новый DF:
df.printSchema()
root
|-- _1: string (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
|-- _4: string (nullable = true)
df.show()
+-------------+----+---+-----------------+
| _1| _2| _3| _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP| -| tr.vwt.gsf.asfh|
|1472237494.63|HTTP| -| tr.sdf.sff.sdfg|
|1473297794.26|HTTP| -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP| -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP| -| tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+
Вы можете выполнить groupBy:
>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>>
Чтобы использовать схему, вам необходимо сначала ее определить: см. https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
Пример схемы можно найти ниже (вам нужно будет добавить поля и обновить имена, введите текст, чтобы адаптировать его к вашему случаю)
from pyspark.sql.types import *
schema = StructType([
StructField("F1", StringType(), True),
StructField("F2", StringType(), True),
StructField("F3", StringType(), True),
StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
После этого вы сможете запустить его со схемой:
df = sqlContext.createDataFrame(lines1,schema)
А теперь у вас будут имена для полей:
df.show()
+-------------+----+---+-----------------+
| F1| F2| F3| F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP| -| tr.vwt.gsf.asfh|
|1472237494.63|HTTP| -| tr.sdf.sff.sdfg|
|1473297794.26|HTTP| -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP| -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP| -| tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+
чтобы сохранить его в CSV, вам нужно использовать to_pandas () и to_csv () (часть python pandas)
http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
df.toPandas().to_csv('mycsv.csv')
содержимое файла csv:
cat mycsv.csv
,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf
Обратите внимание, что вы можете преобразовать столбец с помощью ".cast ()", например приведение F1 к типу float - добавление нового столбца с типом float и удаление старого столбца)
df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")
person
Yaron
schedule
01.09.2016
lines1.take(1)
- person Alberto Bonsanto   schedule 01.09.2016saveAsTextFile
метод для _2 _ тоже класс? Какая у вас версия Spark? - person Ajeet Shah   schedule 01.09.2016How to save in csv?
илиTypeError: Can not infer schema for type: type 'unicode'
? И, глядя на ваш входной файл и шаги, кажется,TypeError
проблемы нет. - person Ajeet Shah   schedule 01.09.2016