Потоки появились еще в Windows NT, но до определенного времени редко использовались прикладными программистами. В наше время, когда даже самый захудалый офисный компьютер обладает как минимум двумя процессорными ядрами, не использовать потоки в программах просто неприлично. В этой статье мы рассмотрим реализацию многопоточности в Delphi 6, Delphi 7 и Delphi 2009. Базовые принципы работы с классом TThread рассматривать не будем, для этого есть встроенная документация. Мы же посмотрим, что у системы под капотом. В качестве введения я кратко опишу две основные проблемы, возникающие в многопоточном программировании.
В процессе разработки многопоточного приложения приходится решать две взаимосвязанные проблемы - разграничение доступа к ресурсам и взаимоблокировки. Если несколько потоков обращаются к одному и тому же ресурсу (области памяти, файлу, устройству) при небрежном программировании может возникнуть ситуация, когда сразу несколько потоков попытаются выполнить некие манипуляции с общим ресурсом. При этом нормальная последовательность операций при обращении к ресурсу, скорее всего, будет нарушена. Проблема с разграничением доступа может возникнуть даже при очень простых операциях. Предположим, у нас есть программа, которая создает несколько потоков. Каждый поток выполняет свою задачу, и затем завершается. Мы хотим контролировать количество потоков, активных в данное время, и с этой целью вводим счетчик потоков - глобальную переменную Counter. Процедура потока при этом может выглядеть так:
procedure MyThread.Execute;
begin
Inc(Counter);
...
Dec(Counter);
end;
Одна из проблем, связанных с этим кодом заключается в том, что процедуры Inc и Dec не атомарны (например, процедуре Inc требуется выполнить три инструкции процессора - загрузить значение Counter в регистр процессора, выполнить сам инкремент, затем записать результат из регистра процессора в область памяти Counter). Нетрудно догадаться, что если два потока попытаются выполнить процедуру Inc одновременно, значение Counter может быть увеличено на 1 вместо 2. Такие ошибки трудно обнаружить при отладке, так как вероятность ошибки не равна единице. Волне может случиться так, что при тестовом прогоне программы все будет работать нормально, а ошибку обнаружит уже заказчик...
Решением проблемы может стать использование специальных функций. Например, вместо процедуры Inc можно использовать процедуру Win API InterlockedIncrement, а вместо Dec - InterlockedDecrement. Эти процедуры гарантируют, что в любой момент времени не более одного потока получит доступ к переменной-счетчику.
В общем случае для решения проблемы разделения доступа используются блокировки - специальные механизмы, которые гарантируют, что в каждый данный момент времени только один поток имеет доступ к некоторому ресурсу. В то время, когда ресурс заблокирован одним потоком, выполнение других потоков, пытающихся получить доступ к ресурсу, приостанавливается. Однако использование блокировок создает другую проблему - взаимоблокировки (deadlocks). Взаимоблокировка наступает тогда, когда потока A блокирует ресурс, необходимый для продолжения работы потока B, а поток B блокирует ресурс, необходимый для продолжения работы потока A. Поскольку ни один поток не может продолжить выполнение, заблокированные ресурсы не могут быть разблокированы, что приводит к повисанию потоков.
Потоки в Delphi 7
По сравнению с Delphi 6 изменений в работе с потоками в Delphi 7 не так уж и много. Рассмотрим реализацию функции CheckSynchronize:
function CheckSynchronize(Timeout: Integer = 0): Boolean;
var
SyncProc: PSyncProc;
LocalSyncList: TList;
begin
if GetCurrentThreadID <> MainThreadID then
raise EThread.CreateResFmt(@SCheckSynchronizeError, [GetCurrentThreadID]);
if Timeout > 0 then
WaitForSyncEvent(Timeout)
else
ResetSyncEvent;
LocalSyncList := nil;
EnterCriticalSection(ThreadLock);
try
Integer(LocalSyncList) := InterlockedExchange(Integer(SyncList), Integer(LocalSyncList));
try
Result := (LocalSyncList <> nil) and (LocalSyncList.Count > 0);
if Result then
begin
while LocalSyncList.Count > 0 do
begin
SyncProc := LocalSyncList[0];
LocalSyncList.Delete(0);
LeaveCriticalSection(ThreadLock);
try
try
SyncProc.SyncRec.FMethod;
except
SyncProc.SyncRec.FSynchronizeException := AcquireExceptionObject;
end;
finally
EnterCriticalSection(ThreadLock);
end;
SetEvent(SyncProc.signal);
end;
end;
finally
LocalSyncList.Free;
end;
finally
LeaveCriticalSection(ThreadLock);
end;
end;
В новой версии вместо флага ProcPosted используется событие SyncEvent, для управления которым создано несколько функций: SetSyncEvent, ResetSyncEvent, WaitForSyncEvent. Метод WaitFor использует событие SyncEvent для оптимизации цикла обработки сообщений. Установка SyncEvent сигнализирует о том, что в очереди появился новый метод, ожидающий синхронизации, и требуется вызвать CheckSynchronize.
У метода CheckSynchronize появился параметр TimeOut, который указывает, сколько времени метод должен ждать события SyncEvent, прежде чем вернуть управление. Указывать время ожидания удобно там, где метод CheckSynchronize вызывается в цикле (при этом поток, вызывавший CheckSynchronize, отдает свое процессорное время другим потокам, вместо того, чтобы крутить вызовы вхолостую), однако и продолжительность вызова метода CheckSynchronize может неоправданно возрасти. Обратите внимание так же на то, как в Delphi 7 изменилась работа с очередью SyncList. В предыдущей версии CheckSynchronize очередь SyncList захватывалась (с помощью ThreadLock) на все время обработки помещенных в очередь методов (а это время могло быть сравнительно большим). А ведь пока CheckSynchronize владеет объектом SyncList, операции с очередью SyncList, выполняемые из других потоков, блокируются. Для того чтобы высвободить SyncList как можно скорее, сохраняет указатель на текущий объект очереди (с помощью функции Win API InterlockedExchange) в локальной переменной LocalSyncList, а переменной SyncList присваивает значение nil. После этого доступ к переменной SyncList открывается снова. Теперь, если другой поток захочет снова синхронизировать метод, ему понадобится создать новый объект SyncList, однако доступ к очереди блокируется только на время, необходимое для обмена указателями, так что общий выигрыш производительности должен быть значителен.
Недостатки реализации потоков в Delphi
Самым главным недостатком следует признать метод, применяемый для приостановки и возобновления выполнения потока. С этой целью в VCL используются функции Windows API SuspendThread и ResumeThread, которые вообще говоря. Предназначены для отладочных целей. Функция SuspendThread может остановить выполнение потока в любой точке. Поток не может запретить приостановку на время выполнения критического фрагмента кода и не получает оповещения о том, что он будет приостановлен. Обмен сообщениями между вторичными потоками и главным потоком продуман достаточно хорошо, в последних версиях Delphi добавлены даже асинхронные вызовы, а вот стандартных механизмов передачи сообщений от главного потока к второстепенным не существует. Тут надо отметить, что под "главным потоком" мы понимаем поток, в котором выполняется метод Application.Run, и обрабатываются события. Delphi плохо подходит для модели, в которой все потоки равноправны.
Расширение возможностей
В качестве демонстрации низкоуровневой работы с очередью вызовов в программе Delphi, напишем функцию ExecAfter. Иногда нам бывает нужно указать, чтобы некоторая процедура выполнялась после выхода из той процедуры, в которой мы находимся. Обычно для решения этой задачи с помощью функции Win API PostMessage главному окну посылается какое-нибудь пользовательское сообщение, обработчик которого вызывает требуемую процедуру. Недостатком этого решения является то, что оно работает только в графических программах, и нам приходится определять собственную обработку сообщений, которые получает окно. В то же время структура очереди сообщений Delphi 2009 позволяет нам поместить в очередь асинхронный вызов, который будет выполнен при следующем вызове CheckSynchronize, то есть, после выхода из процедуры, в которой мы сейчас находимся. Может возникнуть соблазн воспользоваться вызовом
TThread.Queue(nil, MethodToExecute);
но в главном потоке это не сработает. Метод Synchronize, который вызывает Queue, проверит, не вызван ли он из главного потока и в этом случае выполнит MethodToExecute, не откладывая. Итак, процедура ExecAfter:
procedure ExecAfter(AMethod : TThreadMethod);
var
SyncProcPtr: PSyncProc;
SyncRecPtr: PSynchronizeRecord;
begin
New(SyncProcPtr);
New(SyncRecPtr);
SyncRecPtr.FThread := nil;
SyncRecPtr.FMethod := AMethod;
SyncRecPtr.FProcedure := nil;
SyncRecPtr.FSynchronizeException := nil;
SyncProcPtr.Signal := 0;
EnterCriticalSection(ThreadLock);
try
SyncProcPtr.Queued := True;
if SyncList = nil then
SyncList := TList.Create;
SyncProcPtr.SyncRec := SyncRecPtr;
SyncList.Add(SyncProcPtr);
SignalSyncEvent;
if Assigned(WakeMainThread) then
WakeMainThread(SyncProcPtr.SyncRec.FThread);
finally
LeaveCriticalSection(ThreadLock);
end;
end;
Наш вариант ExecAfter позволяет выполнить вызов метода AMethod после выхода из той процедуры, в которой мы вызвали ExecAfter (при желании процедуру нетрудно переписать для вызова самостоятельных процедур, а не методов). Реализация процедуры ExecAfter должна быть расположена в модуле Classes, иначе мы не сможем получить доступ к необходимым нам переменным ThreadLock и SyncList. Между прочим, если вы не хотите вносить изменения в "главную" копию Classes, вы можете использовать локальную копию для конкретной программы. Для того скопируйте измененный файл Classes.pas в директорию проекта. Теперь, если добавить в программу последовательность:
TForm1.Method1;
begin
ExecAfter(Method2);
Method3;
end;
Method2 будет выполнен после Method3 (и после выхода из Method1).
По адресу http://symmetrica.net/Delphi/JoinableThreads.htm вы найдете еще одно расширение - модуль JoinableThreads, который реализует производный от TThread класс TJoinableThread и функцию Join, которая представляет собой аналог WaitFor, позволяющий дождаться завершения всех ранее созданных потоков TJoinableThread.