Разблокировать блокировку из потока, которому она не принадлежит, или перепроектировать, чтобы избежать этого?

У меня есть объект архива, который управляет различными массивами байтов и выдает InputStream и OutputStreams для их чтения и записи. С каждым массивом байтов связан ReentrantReadWriteLock< /а>.

Конструктор подкласса InputStream, который создает архив, получает блокировку для соответствующих данных, а close() снимает блокировку. Теперь проблема:

Предположим, что у меня есть задача, которая будет выполняться в другом потоке, которому требуются входные данные из архива, которому перед запуском передается InputStream в качестве аргумента, и он отвечает за закрытие потока после его завершения. Эти требования кажутся несовместимыми с механизмом блокировки, используемым моим архивом данных, потому что

  1. InputStream создается в потоке, отличном от того, в котором он закрыт.
  2. ReentrantReadWriteLock должен быть освобожден потоком, которому он принадлежит.

Переписывать части программы, которые ожидают InputStream в качестве входных данных, чтобы избежать #1, неудобно и делает эти части менее гибкими. Использование блокировки, которая разрешает смену владельца, позволит мне избежать # 2, но я не вижу никаких блокировок в Java API, которые могли бы справиться с этим, и я не очень заинтересован в создании пользовательской блокировки, если я не т должен.

Какие есть решения этой проблемы?


person uckelman    schedule 03.02.2010    source источник
comment
Как ваш менеджер создает их в другом потоке? То есть, скорее всего, в классе менеджера есть какой-то метод, такой как getStream(<some-parameters). Потоки, использующие этот поток, будут вызывать этот метод, не так ли — и тогда вы сможете выполнить блокировку. Или вы передаете эти объекты последующим потокам или пытаетесь объединить InputStream в своем менеджере?   -  person Kevin Brock    schedule 04.02.2010
comment
@Kevin: методы, которые создают задачи, использующие InputStream, запускаются в том же потоке, где получены блокировки, но сами задачи отправляются в ExecutorService. Однако методы фабрики задач также могут вызываться с потоками, которые не поступают из моего архива данных.   -  person uckelman    schedule 04.02.2010


Ответы (5)


Самый чистый способ сделать это — дать клиентам что-то отличное от подкласса InputStream, даже если это потребует немного больше рефакторинга. В вашем текущем дизайне вы не можете рассчитывать на то, что ваши клиенты снимут блокировку, что, если один из них этого не сделает? Это не хорошая инкапсуляция.

Позвольте объекту архива управлять блокировками и предоставить вашим клиентам простой API для получения данных. Даже если это означает изменение некоторых API.

Проблема с блокировкой и освобождением одного и того же объекта разными потоками является явным признаком того, что вы немного ошиблись в своем текущем дизайне. Поработайте над вышеупомянутым рефакторингом, он вам очень поможет в дальнейшем.

person Yuval Adam    schedule 03.02.2010
comment
Я не беспокоюсь о том, что потоки не будут закрыты. Архив является полностью внутренним, а не частью внешнего API. Количество мест, где мы создаем эти потоки, относительно невелико, и они всегда закрыты внутри блоков finally --- но эти места могут быть в потоке, отличном от того, где был создан поток, и, следовательно, отличаться от того, где была блокировка. приобретенный. - person uckelman; 04.02.2010
comment
Ну, у вас все еще есть мотивация, почему вы должны обрабатывать блокировку в объекте архива... Я сомневаюсь, что есть более чистый способ добиться этого. - person Yuval Adam; 04.02.2010
comment
Неохотно, это решение, которое я реализовал. Я по-прежнему считаю, что наиболее элегантным и простым способом справиться с этим будет повторная блокировка, для которой владелец может быть установлен равным нулю, что указывает на то, что любой поток может ее освободить. К сожалению, метод установки владельца блокировки скрыт в частном внутреннем классе ReentrantReadWriteLock. - person uckelman; 09.02.2010

Судя по вашему ответу на мой вопрос. Вы можете просто отложить создание (и блокировку) InputStream, передав пользовательский класс, используемый для изготовления InputStream, в ThreadExecutorService. Затем поток в службе запрашивает InputStream из этого объекта, который сначала создает его (поскольку он не существует), а затем все готово.

Например:

public interface InputStreamMaker {
    public InputStream getOrCreateInputStream();
}

Затем в вашем основном классе, который запускает все:

static class InputStreamMakerImpl implements InputStreamMaker {
    private final Manager manager; // Your manager class (or use a normal Inner class on manager)
    private InputStream is;

    // other variables needed to define how to create input stream for the particular task here

    InputStreamMakerImpl(Manager manager) {
        this.manager = manager;
    }

    public InputStream getOrCreateInputStream() {
         if (is == null) {
             // pass this object - so manager will have all the details to create the right stream
             is = manager.createNewStream(this);
         }

         return is;
    }
}

а потом в основной ветке...

...some method...
InputStreamMakerImpl maker = new InputStreamMakerImpl(manager /* or this */);
// set values needed in maker for your manager class to create the right input stream
// start the worker and pass the InputStreamMaker (maker) as the parameter instead of the InputStream

Этот внутренний класс может иметь дополнительную информацию, необходимую для создания правильного потока для рабочего процесса. Теперь InputStream создается/открывается в правильном потоке, и этот поток также может закрыть его и разблокировать блокировку.

Изменить: перечитав ваш вопрос, я вижу, что вы, возможно, не захотите изменять какие-либо объекты Worker, которые уже принимают InputStream. Тогда вы все еще можете сделать это:

  1. Создание собственного подкласса InputStream, который содержит все значения того, что я определил выше как InputStreamMakerImpl.

  2. Пользовательский InputStream имеет внутренний метод getInputStream(), который делает то, что я определил для getOrCreateInputStream() выше.

  3. Тогда все методы для InputStream просто делегируют вызовы, например:

    public int read() throws IOException {
        return getInputStream().read();
    }
    
person Kevin Brock    schedule 04.02.2010
comment
Проблема, которую я здесь вижу, заключается в том, что как только вы использовали поток в каком-то потоке, этот поток становится его владельцем. - person uckelman; 09.02.2010

java.io.InputStream имеет всего девять методов. Вы уже переопределили один, то есть метод close().

Я бы порекомендовал вам иметь ленивую блокировку, т.е. не агрессивно приобретать блокировку в конструкторе. Для этого сделайте ReentrantReadWriteLock доступным в вашем подклассе InputStream и создайте закрытый метод с именем doAcquireLock(), который пытается получить блокировку. Затем переопределите все остальные восемь методов InputStream таким образом, чтобы во всех переопределенных методах сначала вызывался doAcquireLock(), а затем делегировал операцию super.

пример:

public int read() 
    doAcquireLock(); // This method get's the lock
    return super.read();
}

С этой блокировкой будет получен поток, который первым вызывает любой из восьми вышеперечисленных методов, и также будет отвечать за закрытие.

Просто чтобы убедиться, что вы не попали в тупиковую ситуацию, вам нужно следить за тем, чтобы клиент вызывал метод закрытия в конце, чтобы снять блокировку. Также ни один из методов не должен вызываться после вызова метода close с помощью логического индикатора, чтобы узнать, закрыт ли поток (устарел).

person Gladwin Burboz    schedule 06.02.2010
comment
Что произойдет, если клиентский поток не вызовет закрытие? Вы действительно должны быть осторожны с этим. - person Gladwin Burboz; 06.02.2010
comment
Если ваш клиентский поток не вызывает close(), он просто сломан. Что, если ваш клиентский поток разыменует нулевое значение или вызовет System.exit()? - person uckelman; 07.02.2010
comment
Вызов close() является обязанностью клиента. Если клиент приобрел ресурс, то ответственность за его освобождение лежит на клиенте. Система.выход()? Вся программа закрывается, поэтому никаких блокировок нет. - person Gladwin Burboz; 08.02.2010
comment
Вы можете использовать некоторые идеи из пула соединений JDBC для возврата неиспользуемых потоков. download-uk.oracle.com/ документы/cd/B28359_01/java.111/e10788/ - person Gladwin Burboz; 08.02.2010
comment
Это кажется мне разумным решением, пока проверка, есть ли у нас замок, достаточно дешево. - person uckelman; 09.02.2010

Решение №2.

Если вам не нужно удерживать блокировку до конца потока, вы можете реализовать свой собственный InputStream, как показано ниже, который вы можете предоставить своим клиентам. Теперь всякий раз, когда какой-либо поток должен прочитать, он получит блокировку, затем прочитает и, наконец, автоматически снимет блокировку, как только чтение будет завершено. Ваш пользовательский OutputStream должен быть реализован аналогичным образом с блокировкой записи.

.

public class LockingInputStream extends InputStream {
    private final InputStream byteArrayInputStream;
    private final Lock r;

    public LockingInputStream(
                    InputStream byteArrayInputStream, 
                    ReentrantReadWriteLock rwl) {
        this.byteArrayInputStream = byteArrayInputStream;
        this.r = rwl.readLock();
    }

    @Override
    public int read() throws IOException {
        r.lock();
        try {
            return byteArrayInputStream.read();
        } finally { r.unlock(); }
    }
    @Override
    public int available() throws IOException {
        r.lock();
        try {
            return byteArrayInputStream.available();
        } finally { r.unlock(); }
    }
    ....
    @Override
    public void close() throws IOException {
        r.lock();
        try {
            byteArrayInputStream.close();
        } finally { r.unlock(); }
    }
}
person Gladwin Burboz    schedule 09.02.2010
comment
Недостатком этого является то, что другой поток может встать между двумя вашими чтениями и изменить данные, которые вы читаете. - person uckelman; 09.02.2010

Еще одно решение, на которое я наткнулся, обдумывая это: один из способов рассмотрения проблемы заключается в том, что она вызвана повторным входом в блокировку. ReentrantReadWriteLock отслеживает владение блокировкой для обработки повторного входа. Если я хочу получить блокировки при создании потока и снять блокировки при закрытии потока, а также закрыть потоки в потоках, отличных от тех, в которых они были созданы, то блокировка с повторным входом не будет работать, потому что неправильный поток будет владеть блокировкой потока.

Один из способов обойти это - использовать блокировку подсчета, но принудительно заблокировать все наружу. Во-первых, подсчет блокировки: состояния представляют собой целые числа -1,0,1,...., где -1 означает запись, 0 означает разблокировку, а положительное n подсчитывает количество одновременных операций чтения. Таким образом, записи по-прежнему являются эксклюзивными, но блокировки не зависят от потока. Принудительная внешняя блокировка: у нас есть такие методы, как exists(), где нам нужно получить блокировку чтения, но exists() вызывается некоторыми методами, которые уже будут иметь блокировку записи. Итак, мы разделяем внутренности exists() на защищенный existsImpl(), который должен вызываться только тогда, когда блокировка уже удерживается (то есть внутренне, другими методами *Impl()). Таким образом, после получения блокировки только неблокирующие методы Impl вызываются, и мы избегаем необходимости повторного входа.

Это все хорошо, за исключением того, что в конечном итоге это ужасно сложно, чтобы гарантировать, что вы не облажаетесь при его реализации --- вызов общедоступного заблокированного метода, когда вы должны вызывать защищенный неблокируемый --- и тем самым создавая тупик. Во-вторых, форсирование всех ваших блокировок наружу означает, что вы держите блокировки дольше, так что это (вероятно) не очень хорошо для параллелизма.

Итак, это то, что я пробовал, но решил не использовать.

person uckelman    schedule 09.02.2010