Search     or:     and:
 LINUX 
 Language 
 Kernel 
 Package 
 Book 
 Test 
 OS 
 Forum 
 iakovlev.org 
      Languages 
      Kernels 
      Packages 
      Books 
      Tests 
      OS 
      Forum 
      Математика 
NEWS
Последние статьи :
  Тренажёр 16.01   
  Эльбрус 05.12   
  Алгоритмы 12.04   
  Rust 07.11   
  Go 25.12   
  EXT4 10.11   
  FS benchmark 15.09   
  Сетунь 23.07   
  Trees 25.06   
  Apache 03.02   
 
TOP 20
 Advanced Bash Scripting G...1859 
 Ethreal 4...1286 
 Secure Programming for Li...1060 
 CPAN-> FAQ...950 
 Intel 386...630 
 Тренажёр...510 
 Go Web ...495 
 Trees...425 
 Ethreal 1...423 
 Максвелл 3...393 
 Alg1...381 
 Rust...369 
 C + UNIX...348 
 Ext4 FS...344 
 Assembler...341 
 William Gropp...337 
 Mod_parrot...329 
 2.0-> Linux IP Networking...326 
 Benchmark...320 
 Rodriguez 6...316 
 
  01.01.2025 : 3803065 посещений 

iakovlev.org

Concurrency

Goroutine - функция, которая может согласованно выполняться с другими функциями. Для создания рутины используется ключевое слово go.
Channel - канал, по которому можно передавать данные, в частности между рутинами, и синхронизировать их работу. Канал может иметь произвольный тип. Каналы можно использовать где угодно: в качестве полей структур, параметров функций, Каналы используются в том числе для синхронизации рутин.
В следующем примере создается один двунаправленный канал, который передается в качестве параметра. Он организует (или синхронизирует) работу между тремя рутинами. Две рутины отсылают данные в канал, а третья принимает. Оператор <- служит для отсылки данных в канал и приемки данных из канала. Если этот оператор стоит слева от переменной, обозначающей канал, это значит, что канал принимает данные извне. Если оператор стоит справа от переменной, это означает отсылку данных:

 func sender(c chan int) {
   for i := 0; i< 11; i++ {
     c <- i
   }
 }
 
 func sender2(c chan int) {
   for i := 11; i< 20; i++ {
     c <- i
   }
 }
 
 func receiver(c chan int) {	
   for {
     msg := <- c
     fmt.Println(msg)
   }
 }
 
 func main() {
   var c chan int = make(chan int)
 
   go sender(c)
   go sender2(c)
   go receiver(c)
 
   var input string
   fmt.Scanln(&input)
 }
Канал можно сделать однонаправленным - для этого нужно изменить заголовок функции:
func receiver(c <- chan int) {
В первом примере в качестве рутины выступает именованная функция. Анонимная функция также может выступать в качестве рутины. В следующем примере мы используем второй вариант. Будут созданы два канала, два сендера и один получатель. Отличие второго примера от первого в том, что получатель будет принимать данные не из одного, а из двух каналов. Для этого используется выражение, аналогичное switch, но только для каналов:
select

 func main() {
   var c1 chan int = make(chan int)
   var c2 chan int = make(chan int)
 
   go func () {
     for i := 0; i< 11; i++ {
       c1 <- i
     }
   }()
   
   go func () {
     for i := 11; i< 20; i++ {
       c2 <- i
     }
   }()
   
   go func () {
     for {
     select {
       case num1 := <- c1:
 	fmt.Println(num1)
       case num2 := <- c2:
 	fmt.Println(num2)
      }
     }
   }()
   
 
   var input string
   fmt.Scanln(&input)
 }
В приведенных примерах использовались небуфферизованные каналы, их особенность в том, что это синхронные каналы, в том смысле, что если мы попытаемся записать данные в уже непустой канал, операция записи заблокирует программу до тех пор, пока данные оттуда не будут прочитаны. Эту особенность можно проиллюстрировать следующим примером - в нем создается небуферизованный канал, делается попытка записать в него сразу три сообщения, что блокируется после первой же записи в канал до тех пор, пока мы не начинаем читать из него:

      message := make(chan string) // no buffer
      count := 3
      go func() {
           for i := 1; i <= count; i++ {
                fmt.Println("send message")
                message <- fmt.Sprintf("message %d", i)
           }
      }()
      time.Sleep(time.Second * 3)
      for i := 1; i <= count; i++ {
           fmt.Println(<-message)
      }
Буфферизованные асинхронные каналы создаются с дополнительным параметром - capacity.
Если мы сделаем попытку записать в канал или прочитать из канала сообщений больше, чем его буфер, произойдет дедлок:

     c := make(chan int, 2)
     c <- 1
     c <- 2
     c <- 3
     fmt.Println(<-c)
     fmt.Println(<-c)
 
Чтобы этого не происходило, канал нужно закрывать командой close. В следующем примере мы создаем канал с буфером=10, создаем рутину, в которой заполняем этот канал и закрываем канал, а потом читаем из него. Буферизованный канал позволяет делать итерацию, но перед этим канал нужно закрывать. После чего чтение из закрытого канала - неблокирующая операция:

 func fibonacci(n int, c chan int) {
     x, y := 0, 1
     for i := 0; i < n; i++ {
         c <- x
         x, y = y, x+y
     }
     close(c)
 }
 
 func main() {
     c := make(chan int, 10)
     go fibonacci(cap(c), c)
     for i := range c {
         fmt.Println(i)
     }
 }
 
Иногда возникают ситуация, когда канал по каким-то причинам задерживает отдачу. В этом случае можно использовать timeout для ограничения времени отклика. В следующем примере делается http-запрос. Создаются два канала - один для ответа и второй для ошибки. С помощью оператора select обрабатывается возможные сценарии, когда ответ на запрос из канала может вернуть ошибку либо задержаться:

 	response := make(chan *http.Response, 1)
 	errors   := make(chan *error)
 
 	go func() {
 		resp, err := http.Get("http://iakovlev.org/")
 		if err != nil {
 			errors <- &err
 		}
 		response <- resp
 	}()
 	for {
 		select {
 		case r := <-response:
 			fmt.Printf("%s", r.Body)
 			return
 		case err := <-errors:
 			log.Fatal(err)
 		case <-time.After(100 * time.Millisecond):
 			fmt.Printf("Timed out!")
 			return
 		}
 	}    
 
В гоу concurrency - это не параллелизм. Для обьяснения этого парадокса рассмотрим код, в котором не будет ни одной рутины, в нем будут две анонимных функции, которые по всем законам жанра будут выполняться строго последовательно - именно в том порядке, в котором они прописаны:

     fmt.Println("Start")
     func() {
 	time.Sleep(1000000 * time.Microsecond)
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     println()
     func() {
 
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
     }()
     println()
     fmt.Println("End")
 
 Вывод:
 Start
 a b c d e f g h i j k l m n o p q r s t u v w x y z 
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
 End
 
Вывод вполне предсказуем, при этом обратите внимание на тот факт, что во время паузы в первой анонимной функции программа будет заблокирована на период ожидания.
Перепишем этот пример - анонимные функции сделаем рутинами с помощью go и добавим синхронизацию:

     var wg sync.WaitGroup
     wg.Add(2)
     fmt.Println("Start")
     go func() {
 	defer wg.Done()
 	time.Sleep(1000000 * time.Microsecond)
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     go func() {
 	defer wg.Done()
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
 	println()
     }()
     wg.Wait()
     println()
     fmt.Println("End")
 Вывод:
 Start
 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 
 a b c d e f g h i j k l m n o p q r s t u v w x y z 
 End
 
Порядок работы и вывод существенно меняются - сначала выполнится вторая рутина, а потом первая, т.е. разница в том, что теперь первая рутина не блокирует ход программы, у нас есть возможность автоматически переключиться между рутинами в том случае, если одна занята ожиданием. Т.е. concurrency дает возможность использовать ресурсы на сто процентов.
Когда мы говорим, что concurrency - это не параллелизм, мы имеем ввиду тот факт, что в гоу по умолчанию программа выполняется одним процессором. Если мы ходим использовать больше одного ядра, т.е. сделать программу параллельной, мы должны сделать обьявление - runtime.GOMAXPROCS(2):

     runtime.GOMAXPROCS(2)
     var wg sync.WaitGroup
     wg.Add(2)
     fmt.Println("Start")
     go func() {
 	defer wg.Done()
         for char := 'a'; char < 'a'+26; char++ {
             fmt.Printf("%c ", char)
         }
     }()
     go func() {
 	defer wg.Done()
         for number := 1; number < 27; number++ {
             fmt.Printf("%d ", number)
         }
 	println()
     }()
     wg.Wait()
     println()
     fmt.Println("End")
 Вывод:
 Start
 1 2 3 4 5 a 6 b c d e f g h i j k l m n o p 7 q r 8 s t u v w x y z 9 10 ... 
 End
 
В данном случае это будет уже программа с двумя рутинами, выполняемыми параллельно на разных ядрах, и каждый раз будет генерироваться непредсказуемый вывод.

Race condition - ситуация, когда многопоточная программа дает непредсказуемый результат.
Такой результат зачастую невозможно воспроизвести.
Одна из разновидностей - data race - случается тогда, когда две рутины получают одновременный доступ на запись к одной и той же переменной. Есть три варианта для обхода data race:
1. Сделать доступ этой переменной только на чтение - правда, в большинстве случаев это не подходит
2. Сделать доступ на запись в переменную только для одной рутины
3. Можно использовать мьютекс, у которого есть методы блокирования и разблокирования:

     mu sync.Mutex 
     mu.Lock()
     ...
     mu.Unlock()
 
Область кода между Lock() и Unlock() называется критической секцией. Функции с такими секциями называются мониторами.
Иногда возникает ситуация, когда вызов Unlock() полезно ложить в отложенную функцию defer, чтобы быть уверенным в гарантированной разблокировке - в следующем примере разблокировка произойдет уже после того, как функция вернет значение - поэтому это concurrency-safe функция, даже если в функции случится паника и там будет стоять вызов рекавери, разблокировка все равно сработает :

 func Balance() int {
   mu.Lock()
   defer mu.Unlock()
   return balance
 }
 
 
Простая ситуация, когда возможен дедлок: есть две функции, одна вызывается внутри другой, обе функции используют один и тот же мьютекс для блокировки, при вызове вложенной функции и произойдет дедлок, потому что у мьютекса нельзя вызывать два раза подряд один и тот же метод
mu.Lock()
Есть специальный тип мьютекса с эксклюзивным доступом на запись, но он работает медленнее обычного:
var mu sync.RWMutex
В пакете sync есть еще один обьект
sync.Once
Он включает в себя одновременно мьютекс и булевский флаг. У него есть метод
once.Do(oblect)
При первом вызове этого метода флаг установится в true, при дальнейших вызовах чтение расшаренного в памяти обьекта будет оптимизировано.

В майнфреймовых языках считается правилом хорошего тона использование блокировок с помощью мьютексов. Гоу также предоставляет аналогичную возможность. Кроме этого, в гоу есть другая возможность для синхронизации - для этого можно использовать каналы.
В следующем примере показано, как работать с "глобальным" словарем, при этом мьютексы не используются. В словаре создается два счетчика, затем они наращиваются, после чего выводятся на экран. На самом деле словарь здесь не глобален, а локален внутри одной функции, но виден он отовсюду. Канал, по которому происходит обмен, предоставляет доступ к этому словарю из любой точки программы. Этот небуферизованный - синхронный - канал является одновременно синронизатором доступа к словарю:

 package main
 
 import "sync"
 
 type request struct {
 	key int
 	value int
 	op string
 	ret chan int
 } 
 
 func set(c chan request, key int)  {
 	c <- request{key, 0, "set", nil}
 }
 
 func get(c chan request, key int) int {
 	result := make(chan int)
 	c <- request{key, 0, "get", result}
 	return <-result
 	
 }
 
 func add(c chan request, key int)  {
 	c <- request{key, 0, "add", nil}
 }
 
 func runMap(c chan request) {
 	m := make(map[int] int)
 	for {
 		req := <- c
 		switch req.op {
 		  case "set":
 		    m[req.key] = 0
 		  case "get":
 		    req.ret <- m[req.key]
 		  case "add":
 		    m[req.key] += 1
 		}
 	}
 }
 
 func main() {
 	m := make(chan request)
 	
 	go runMap(m)
 	set(m, 1 )
 	set(m, 2)
 	
 	var wg sync.WaitGroup
 	wg.Add(2)
 	go func() {
 	  defer wg.Done()	  
 	  for i := 0; i< 100000; i++ {
 	    add(m, 1)
 	  }
 	}()
 	
 	go func() {
 	  defer wg.Done()	  
 	  for i := 0; i< 200000; i++ {
 	    add(m, 2)
 	  }
 	}()
 	wg.Wait()
 
  
 	println(get(m, 1))
 	println(get(m, 2))
 	
 	  
 }
 



Оставьте свой комментарий !

Ваше имя:
Комментарий:
Оба поля являются обязательными

 Автор  Комментарий к данной статье