Многопоточность — это способность программы выполнять несколько потоков одновременно.
Процесс — это экземпляр выполняемой программы. Он имеет свои собственные ресурсы, такие как память, файловые дескрипторы и т.д. Каждый процесс изолирован от других.
Поток — это подмножество процесса. Потоки внутри одного процесса разделяют ресурсы, такие как память, что позволяет им взаимодействовать более эффективно. Каждый поток разделяет память с другими потоками своего процесса.
Основной плюс многопоточности: повышение производительности
Основной минус многопоточности: сложность разработки и отладки
class Program
{
/// <summary>
/// Функция, которая будет выполняться в отдельном потоке
/// </summary>
static void Work()
{
Console.WriteLine("Второстепенный поток стартовал");
Thread.Sleep(2000); // Симулируем работу
Console.WriteLine("Второстепенный поток финишировал");
}
static void Main()
{
// создаем поток, указывая ссылку на функцию, которая будет в нем выполняться
var thread = new Thread(Work);
// запускаем поток
thread.Start();
// здесь можно писать код, не зависимый от второстепенного потока
Console.WriteLine("Работает главный поток");
// Ожидаем завершения потока
thread.Join();
// код продолжит выполняться только когда второстепенный поток завершит работу
Console.WriteLine("Главный поток закончил работу");
}
}
Метод Work соответствует делагату ThreadStart (ничего не принимает, ничего не возвращает). Если нам необходимо передать данные в поток, то можно использовать делега ParameterizedThreadStart (принимает Object?, ничего не возвращает)
class Program
{
static void Main()
{
Thread thread = new Thread(new ParameterizedThreadStart(WorkWithParameter));
thread.Start("Параметр в потоке!");
static void WorkWithParameter(object? obj)
{
if (obj is null) {
return;
}
string message = (string)obj;
Console.WriteLine(message);
}
}
}
Главный и создаваемый поток не имеют общих ресурсов. В приложениях обычно общие ресурсы есть. Можем получить "состояние гонки" (race condition).
class Program
class Program
{
static int counter = 0;
/// <summary>
/// В каждом потоке будем прибавлять миллион раз делать инкремент
/// </summary>
static void IncrementCounter()
{
for (int i = 0; i < 1000000; i++)
{
counter++;
}
}
static void Main()
{
// будет 5 потоков
var threads = new Thread[5];
// Создаем и запускаем потоки
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(IncrementCounter);
threads[i].Start();
}
// Ждем завершения всех потоков
foreach (Thread thread in threads)
{
thread.Join();
}
Console.WriteLine($"Значение счетчика: {counter}");
}
}
В виду того, что инкремент не атомарен, результат все время будет разный (поскольку потоки будут переписывать результаты работы друг друга). Для решения этой проблемы следует использовать механизмы синхронизации
class Program
{
static int counter = 0;
// вспомогательный объект, для которого будет выполняться синхронизация
static object lockObject = new object();
// добавляем
/// <summary>
/// В каждом потоке будем прибавлять миллион раз делать инкремент
/// </summary>
static void IncrementCounter()
{
for (int i = 0; i < 1000000; i++)
{
// делаем блокировку (тип параметра должен быть ссылочным, приходится хитрить)
lock(lockObject)
{
// эта часть кода в один момент может выполняться только одним потоком
// он получает блокировку на lockObject
counter++;
}
}
}
static void Main()
{
// будет 5 потоков
var threads = new Thread[5];
// Создаем и запускаем потоки
for (int i = 0; i < threads.Length; i++)
{
threads[i] = new Thread(IncrementCounter);
threads[i].Start();
}
// Ждем завершения всех потоков
foreach (Thread thread in threads)
{
thread.Join();
}
Console.WriteLine($"Значение счетчика: {counter}");
}
}
Существуют более сложные механизмы синхронизации, но они не должны потребоваться при выполнении ЛР 3. А вот что потребуется точно - это коллекции (очередь и словарь). Но не совсем обычные...
С обычными коллекциями также бывают проблемы
Queue<int> queue = new Queue<int>();
void EnqueueItems()
{
for (int i = 0; i < 1000; i++)
{
queue.Enqueue(i);
}
}
var thread1 = new Thread(EnqueueItems);
var thread2 = new Thread(EnqueueItems);
thread1.Start();
thread2.Start();
thread1.Join();
thread2.Join();
Console.WriteLine($"Всего элементов в очереди: {queue.Count}");
Этот код может привести к исключению. Можно использовать механизмы синхронизации, но лучше - потокобезопасные коллекции.
using System.Collections.Concurrent;
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
void EnqueueItems()
{
for (int i = 0; i < 1000; i++)
{
queue.Enqueue(i);
}
}
var thread1 = new Thread(EnqueueItems);
var thread2 = new Thread(EnqueueItems);
thread1.Start();
thread2.Start();
thread1.Join();
thread2.Join();
Console.WriteLine($"Всего элементов в очереди: {queue.Count}");
При выполнении ЛР 3 вам может понадобиться ConcurrentDictionary для хранения таблицы сопоставления игр и потоков.
Реализация паттерна "производитель-потребитель" (в данном случае - несколько потребителей)
using System.Collections.Concurrent;
BlockingCollection<int> collection = new BlockingCollection<int>();
void Producer()
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Отправляем задачу {i}");
collection.Add(i);
Thread.Sleep(500);
}
collection.CompleteAdding();
}
void Consumer(object? c)
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Выполнили задачу {item} в потоке {(int)c}");
}
}
var producerThread = new Thread(Producer);
var consumerThread1 = new Thread(Consumer);
var consumerThread2 = new Thread(Consumer);
var consumerThread3 = new Thread(Consumer);
producerThread.Start();
consumerThread1.Start(1);
consumerThread2.Start(2);
consumerThread3.Start(3);
producerThread.Join();
consumerThread1.Join();
consumerThread2.Join();
consumerThread3.Join();
Работа с очередями задач
using System.Collections.Concurrent;
class Program
{
// Количество рабочих потоков
static int numberOfThreads = 3;
// Массив рабочих потоков
static Thread[] workerThreads;
// Массив очередей задач для каждого потока
static BlockingCollection<Action>[] taskQueues;
static void Main()
{
// Инициализируем массивы
workerThreads = new Thread[numberOfThreads];
taskQueues = new BlockingCollection<Action>[numberOfThreads];
// Создаем очереди задач и стартуем потоки
for (int i = 0; i < numberOfThreads; i++)
{
int threadIndex = i;
taskQueues[i] = new BlockingCollection<Action>();
// Создаем и запускаем рабочий поток
// можно и так данные передавать в поток (здесь передается индекс)
workerThreads[i] = new Thread(() => ProcessTasks(threadIndex));
workerThreads[i].IsBackground = true;
workerThreads[i].Start();
Console.WriteLine($"Поток {threadIndex} запущен.");
}
// Добавляем задачи
for (int i = 0; i < 10; i++)
{
var threadIndex = i % numberOfThreads; // распределянм задачи по потокам
var taskNumber = i; // Номер задачи для отображения
// Добавляем задачу в очередь потока
taskQueues[threadIndex].Add(() =>
{
Console.WriteLine($"Задача {taskNumber} выполняется в потоке {threadIndex}");
// Имитируем работу
Thread.Sleep(500);
});
Console.WriteLine($"Задача {taskNumber} добавлена в очередь потока {threadIndex}");
}
// Пусть задачи выполняются 6 секунд
Thread.Sleep(6000);
// Сообщаем потокам, что больше не будет задач
foreach (var queue in taskQueues)
{
queue.CompleteAdding();
}
// Ожидаем завершения всех рабочих потоков
foreach (var thread in workerThreads)
{
thread.Join();
}
Console.WriteLine("Все задачи выполнены.");
}
// Метод обработки задач для рабочего потока
static void ProcessTasks(int threadIndex)
{
Console.WriteLine($"Поток {threadIndex} начинает обработку задач.");
// Получаем очередь задач для текущего потока
var queue = taskQueues[threadIndex];
// Получаем задачи из очереди и обрабатываем их
foreach (var task in queue.GetConsumingEnumerable())
{
task();
}
Console.WriteLine($"Поток {threadIndex} завершил обработку задач.");
}
}
Несколько тонкостей при работе с BlockingQueue: