Как вывести данные искры в файл csv с отдельными столбцами?

Мой код 1st извлекает данные с помощью регулярного выражения и записывает эти данные в текстовый файл (строковый формат). Затем я попытался создать фрейм данных из содержимого текстового файла, чтобы у меня были отдельные столбцы, что привело к ошибке. (Запись в файл csv записывает все в один столбец).

with open("C:\\Sample logs\\dataframe.txt",'a') as f:
    f.write(str(time))
    f.write(" ")
    f.write(qtype)
    f.write(" ")
    f.write(rtype)
    f.write(" ")
    f.write(domain)
    f.write("\n")
 new = sc.textFile("C:\\Sample logs\\dataframe.txt").cache() # cause df requires an rdd
 lines1 = new.map(lambda x: (x, ))
 df = sqlContext.createDataFrame(lines1)

Но я получаю следующую ошибку:

TypeError: невозможно вывести схему для типа: type 'unicode'

Я пробовал другие способы, но не помогло. Все, что я хочу сделать, это то, что после выполнения операции записи я хочу создать фрейм данных с отдельными столбцами, чтобы использовать groupBy ().

Вход в текстовый файл:

1472128348.0 HTTP - tr.vwt.gsf.asfh
1472237494.63 HTTP - tr.sdf.sff.sdfg
1473297794.26 HTTP - tr.asfr.gdfg.sdf
1474589345.0 HTTP - tr.sdgf.gdfg.gdfg
1472038475.0 HTTP - tr.sdf.csgn.sdf

Ожидаемый результат в формате csv:

То же, что и выше, но разделено на столбцы, чтобы я мог выполнять групповые операции.


person kaks    schedule 01.09.2016    source источник
comment
Не могли бы вы сделать lines1.take(1)   -  person Alberto Bonsanto    schedule 01.09.2016
comment
приведите пример входных данных и ожидаемую структуру фрейма данных   -  person Yaron    schedule 01.09.2016
comment
Есть ли saveAsTextFile метод для _2 _ тоже класс? Какая у вас версия Spark?   -  person Ajeet Shah    schedule 01.09.2016
comment
@AlbertoBonsanto: возвращает первую строку из ввода   -  person kaks    schedule 01.09.2016
comment
@Yaron: Я добавил это к вопросу   -  person kaks    schedule 01.09.2016
comment
@Ajeets: он наверняка выдаст ошибку позже, если предыдущая строка пройдет. Просто добавил, чтобы узнать об ошибках.   -  person kaks    schedule 01.09.2016
comment
@kaks В чем именно проблема? How to save in csv? или TypeError: Can not infer schema for type: type 'unicode'? И, глядя на ваш входной файл и шаги, кажется, TypeError проблемы нет.   -  person Ajeet Shah    schedule 01.09.2016


Ответы (1)


Чтобы заменить «слова, разделенные пробелами» в список слов, вам необходимо заменить:

lines1 = new.map(lambda x: (x, ))

с участием

 lines1 = new.map(lambda line: line.split(' '))

Я попробовал это на своей машине, и после выполнения следующих

df = sqlContext.createDataFrame(lines1)

Был создан новый DF:

df.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)
 |-- _4: string (nullable = true)

df.show()
+-------------+----+---+-----------------+
|           _1|  _2| _3|               _4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

Вы можете выполнить groupBy:

>>> df2 = df.groupBy("_1")
>>> type(df2)
<class 'pyspark.sql.group.GroupedData'>
>>> 

Чтобы использовать схему, вам необходимо сначала ее определить: см. https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

Пример схемы можно найти ниже (вам нужно будет добавить поля и обновить имена, введите текст, чтобы адаптировать его к вашему случаю)

from pyspark.sql.types import *
schema = StructType([
    StructField("F1", StringType(), True),
    StructField("F2", StringType(), True),
    StructField("F3", StringType(), True),
    StructField("F4", StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)

После этого вы сможете запустить его со схемой:

df = sqlContext.createDataFrame(lines1,schema)

А теперь у вас будут имена для полей:

df.show()
+-------------+----+---+-----------------+
|           F1|  F2| F3|               F4|
+-------------+----+---+-----------------+
| 1472128348.0|HTTP|  -|  tr.vwt.gsf.asfh|
|1472237494.63|HTTP|  -|  tr.sdf.sff.sdfg|
|1473297794.26|HTTP|  -| tr.asfr.gdfg.sdf|
| 1474589345.0|HTTP|  -|tr.sdgf.gdfg.gdfg|
| 1472038475.0|HTTP|  -|  tr.sdf.csgn.sdf|
+-------------+----+---+-----------------+

чтобы сохранить его в CSV, вам нужно использовать to_pandas () и to_csv () (часть python pandas)

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html

df.toPandas().to_csv('mycsv.csv')

содержимое файла csv:

cat mycsv.csv

,F1,F2,F3,F4
0,1472128348.0,HTTP,-,tr.vwt.gsf.asfh
1,1472237494.63,HTTP,-,tr.sdf.sff.sdfg
2,1473297794.26,HTTP,-,tr.asfr.gdfg.sdf
3,1474589345.0,HTTP,-,tr.sdgf.gdfg.gdfg
4,1472038475.0,HTTP,-,tr.sdf.csgn.sdf

Обратите внимание, что вы можете преобразовать столбец с помощью ".cast ()", например приведение F1 к типу float - добавление нового столбца с типом float и удаление старого столбца)

df = df.withColumn("F1float", df["F1"].cast("float")).drop("F1")
person Yaron    schedule 01.09.2016
comment
Благодаря тонну! Это сработало! Кстати, когда я использую схему, для StructField FloatType записывает только нулевые значения. StringType не имеет проблем. Но знаете ли вы, почему FloatType копирует null на мой лист Excel? - person kaks; 02.09.2016
comment
когда я обновил свою схему для использования FloatType в F1, я получил следующую ошибку: TypeError: FloatType не может принять объект типа ‹type 'unicode'›. Я не уверен, почему он был идентифицирован как юникод. см. обновление в моем ответе относительно .cast () - person Yaron; 04.09.2016
comment
Когда я сделал кастинг, новый столбец не был добавлен, но старый F1 был удален. - person kaks; 06.09.2016
comment
попробуйте следующее: перед приведением: df.show (), приведение: df = df.withColumn (F1float, df [F1] .cast (float)), после преобразования: df.show () - дайте мне знать, если добавлен новый столбец - person Yaron; 06.09.2016
comment
Привет .. я пробовал это. Теперь добавлен новый столбец. Но в новом столбце повторяется только одно значение, в отличие от F1, у которого было множество значений. Похоже, что произошло какое-то преобразование, и было добавлено только одно значение (которого нет в F1). - person kaks; 06.09.2016