Разделить файл CSV на равные части?

У меня есть большой CSV-файл, который я хотел бы разделить на число, равное количеству ядер ЦП в системе. Затем я хочу использовать многопроцессорность, чтобы все ядра работали над файлом вместе. Однако у меня возникают проблемы даже с разделением файла на части. Я просмотрел весь google и нашел пример кода, который делает то, что я хочу. Вот что у меня есть до сих пор:

def split(infilename, num_cpus=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    total_file_size = os.path.getsize(infilename)
    print total_file_size
    files = list()
    with open(infilename, 'rb') as infile:
        for i in xrange(num_cpus):
            files.append(tempfile.TemporaryFile())
            this_file_size = 0
            while this_file_size < 1.0 * total_file_size / num_cpus:
                files[-1].write(infile.read(READ_BUFFER))
                this_file_size += READ_BUFFER
        files[-1].write(infile.readline()) # get the possible remainder
        files[-1].seek(0, 0)
    return files

files = split("sample_simple.csv")
print len(files)

for ifile in files:
    reader = csv.reader(ifile)
    for row in reader:
        print row

Два отпечатка показывают правильный размер файла и то, что он был разделен на 4 части (моя система имеет 4 ядра процессора).

Однако последний раздел кода, который печатает все строки в каждой из частей, дает ошибку:

for row in reader:
_csv.Error: line contains NULL byte

Я попытался распечатать строки без запуска функции разделения, и все значения печатаются правильно. Я подозреваю, что функция разделения добавила несколько байтов NULL к результирующим 4 частям файла, но я не уверен, почему.

Кто-нибудь знает, правильный ли это и быстрый способ разбить файл? Мне просто нужны результирующие фрагменты, которые могут быть успешно прочитаны csv.reader.


person Colin    schedule 19.06.2015    source источник
comment
У вас есть нулевые байты в вашем файле? Распечатайте строки с repr   -  person Padraic Cunningham    schedule 20.06.2015
comment
Могу ли я предположить, что нет, поскольку печать строк исходного файла без разделения прошла успешно?   -  person Colin    schedule 20.06.2015
comment
Простым методом было бы получить количество строк и разбить файл на n фрагментов.   -  person Padraic Cunningham    schedule 20.06.2015
comment
Вы не можете разделить CSV-файл в какой-то произвольной точке, формат файла ориентирован на строки, поэтому любое разделение должно происходить на границе между строками, что означает, что вы знаете, где они находятся.   -  person martineau    schedule 20.06.2015
comment
@Colin, если разделение на строки достаточно хорошо pastebin.com/xR39xkhi   -  person Padraic Cunningham    schedule 20.06.2015
comment
Вы просили разделить файлы CSV, и ответы уже есть. Однако вы также обосновали использование всех ядер ЦП. Два балла по этому поводу. Вы должны проверить, является ли файловый ввод-вывод или обработка чисел вашим узким местом. Вы знаете о глобальной блокировке интерпретатора?   -  person stefan    schedule 20.06.2015
comment
@PadraicCunningham Да, разделение по строкам меня устраивает, если в конце остаются файлы #core со всеми неповрежденными данными. Что делает «1 вместо _»? Как насчет «lines[-1] += islice(f, None)»? Спасибо.   -  person Colin    schedule 22.06.2015
comment
@stefan Как я могу подтвердить, что ввод-вывод или обработка чисел являются моим узким местом? Я подозревал, что это так, поскольку основная часть времени моей программы уходит на обработку CSV-файла построчно. Что я делаю, так это вычисляю умножение значений из двух столбцов + скользящее среднее этого результата.   -  person Colin    schedule 22.06.2015


Ответы (1)


Как я уже сказал в комментарии, CSV-файлы должны быть разделены по границам строк (или строк). Ваш код этого не делает и потенциально разбивает их где-то посередине, что, как я подозреваю, является причиной вашего _csv.Error.

Следующее позволяет избежать этого, обрабатывая входной файл как серию строк. Я протестировал его, и, похоже, он работает автономно в том смысле, что он разделил образец файла на фрагменты примерно одинакового размера, потому что маловероятно, что целое число строк точно поместится в фрагмент.

Обновить

Это существенно более быстрая версия кода, чем я изначально опубликовал. Улучшение заключается в том, что теперь он использует собственный метод tell() временного файла для определения постоянно меняющейся длины файла по мере его записи вместо вызова os.path.getsize(), что устраняет необходимость flush() файла и вызова os.fsync() для него после записи каждой строки.

import csv
import multiprocessing
import os
import tempfile

def split(infilename, num_chunks=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    print 'in_file_size:', in_file_size
    chunk_size = in_file_size // num_chunks
    print 'target chunk_size:', chunk_size
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    temp_file.write(infile.next())
                except StopIteration:  # end of infile
                    break
            temp_file.seek(0)  # rewind
            files.append(temp_file)
    return files

files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))

for i, ifile in enumerate(files, start=1):
    print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
    print 'contents of file {}:'.format(i)
    reader = csv.reader(ifile)
    for row in reader:
        print row
    print ''
person martineau    schedule 20.06.2015
comment
Спасибо за вашу помощь. Этот код действительно работает, но для файла размером 130 МБ это заняло почти 20 минут. Я часто работаю с файлами размером до 50 ГБ. Есть ли способ сделать его более эффективным? Кажется, есть много обращений к жесткому диску. - person Colin; 22.06.2015
comment
@Colin: Разделение файла по своей сути является трудоемким процессом, потому что, как минимум, оно включает в себя чтение и запись данных всего файла. Было заметное замедление, когда я добавил os.fsync() в соответствии с документацией, хотя казалось, что он работает без него в моей системе. Если допустимы временные файлы меньшего размера одинакового размера, вы можете просто сравнить размеры каждой второй или каждой третьей строки. Другой подход состоит в том, чтобы начать с математически точных точек деления, а затем настроить каждую из них, читая вперед от этой позиции до ближайшего разрыва строки. - person martineau; 22.06.2015
comment
@Colin: Вам действительно нужно физически разделить файл? Вполне возможно, что несколько процессов одновременно читают один и тот же файл. - person martineau; 22.06.2015
comment
Спасибо, теперь намного быстрее. Потребовалось около 2,5 с с обновленным кодом. Меня не волнует точный размер каждого куска, если они примерно одинаковы. Первоначально я думал, что разделю файлы, чтобы я мог передать их каждому ядру процессора. Моя цель - проанализировать файл csv и умножить определенные столбцы вместе, а затем найти его скользящее среднее. Если я получу доступ к одному и тому же файлу с 4 ядрами одновременно, это повлияет на производительность? - person Colin; 22.06.2015
comment
Я не думаю, что мое предыдущее предложение не проверять размер в каждой строке сильно повлияет на текущий код, который, как я подозреваю, теперь связан с вводом-выводом. Единственный способ обойти это - избавиться от него, поэтому я предложил просто разрешить каждому процессу доступ к одному и тому же файлу, я не думаю, что это приведет к значительным проблемам с производительностью. Все еще будут некоторые накладные расходы, связанные с отслеживанием того, какой частью файла каждый процесс должен ограничивать себя в обработке, что означает, что вам все равно нужно знать, где расположены границы строк относительно каждого фрагмента файла. - person martineau; 22.06.2015
comment
Чтобы получить доступ к файлу с помощью нескольких процессов, я должен просто открыть файл с помощью программы чтения csv в каждом процессе и заставить их перейти к соответствующей строке в файле? Я хотел бы сравнить производительность этого с разделением файла и посмотреть, что более эффективно. На заметку: с текущим кодом я попытался изменить частоту процессора между 1,4 ГГц и 1,9 ГГц, и я вижу, что это сильно влияет на время разделения, а также на время обработки. Это предполагает, что он все еще привязан к процессору. - person Colin; 22.06.2015
comment
Да, это будет базовый подход к тому, чтобы несколько процессов обращались к одному и тому же файлу, за исключением того, что каждый процесс также должен знать, когда остановиться (читая свою часть файла). Если подход с разделением файлов по-прежнему связан с процессором, то мои предыдущие предложения могут помочь — чтобы действительно знать, где тратится большая часть времени выполнения, вы должны профилировать свой код. Это довольно легко сделать со встроенным модулем profile (или cProfile). - person martineau; 23.06.2015
comment
Возможно, можно еще больше ускорить текущую версию, не используя temp_file.tell(), а вместо этого самостоятельно отслеживать, сколько байтов уже было записано в файл, тем самым избегая, по крайней мере, накладных расходов на вызов функции, если не больше. Просто добавьте длину числа байтов, которое infile.next() возвращает в накопитель bytes_written (вроде того, что вы делаете в коде в своем вопросе). - person martineau; 23.06.2015
comment
В итоге я потратил много времени, пытаясь отладить проблемы с травлением, связанные с подходом «разделить на фрагменты файлов», который я изначально хотел использовать. Кажется, файлы не являются одной из вещей, которые вы можете передать процессам. С тех пор я начал исследовать другой предложенный вами метод, а именно, чтобы все процессы обращались к одному и тому же файлу. Хотя мне удалось заставить это работать, у меня возникли проблемы с производительностью islice. Я опубликую другую тему, чтобы попросить помощи по этому поводу. - person Colin; 05.07.2015
comment
Похоже, все, что вам нужно сделать, это передать имена файлов фрагментов другим процессам, а не самим файлам (это не то, что вы хотели бы солить, даже если бы могли). Я точно не знаю, как вы используете islice для доступа к частям в других процессах, но, конечно, могу понять, как это может быть довольно неэффективно, если делать это наивно. Дайте мне знать, когда вы опубликуете другой вопрос. - person martineau; 05.07.2015
comment
Я разместил вопрос здесь: stackoverflow.com/questions /31225782/. В итоге я использовал предложение в одной из ссылок, которая создает индекс файла, чтобы можно было использовать seek(). Кажется, это решило мои проблемы с производительностью на данный момент. Тем не менее, я получаю только 50% снижение скорости при использовании 4 ядер по сравнению с 1 ядром. Я сейчас исследую, почему это так. Я могу вернуться к вашему предложению выше относительно передачи имен файлов. Как мне связать имя файла с каждым из фрагментов на основе предоставленного вами кода? - person Colin; 05.07.2015
comment
Вы можете получить имя временного файла, используя tempfile.NamedTemporaryFile() и обратившись к атрибуту .name результата. Что касается поиска, создание файлового индекса для каждой строки в файле несколько расточительно, если вам понадобится очень небольшое их количество. Тем не менее, кажется, что вы приближаетесь к сокращению времени обработки на 25% с 4 ядрами. - person martineau; 05.07.2015