24 mar. 2013

Programmation concurrente : appels uniques, verrous, conditions

thread.jpg

L'article précédent a introduit les classes std::thread et std::atomic<> proposées par le standard C++11. La première de ces classes permet la création et la gestion de processus légers ; la seconde est utilisée pour accéder de manière atomique à une donnée particulière, ce qui permet d'implémenter certains algorithmes spécifiques (le spin lock est ainsi donné en exemple).

Ce nouveau billet va maintenant s'intéresser à deux autres primitives de synchronisations très utiles dans le domaine de la programmation concurrente : les verrous d'exclusion mutuelle (mutex) et les variables de condition (condition variables). Ces deux sujets seront traités après l'introduction d'un autre point qui, vous allez voir, va se révéler très, très intéressant.

Une fois, ça suffit : les appels uniques

Supposons un cas couramment rencontré dans le domaine de la programmation concurrent : mon programme crée N thread identiques (N >= 2) qui s'exécutent en parallèle. Un action particulière a lieu et tous les threads en sont averti. En réponse à se traitement, je dois appeler une fonction particulière. Cependant, je dois être sûr que cette fonction n'est appelée avec succès qu'une seule et unique fois - en cas d'échec, un autre thread a le droit de l'appeler. Tant que ce traitement spécifique ne s'est pas terminé avec succès, alors aucun autre traitement n'est engagé.

L'algorithme classique qui sera mis en oeuvre est grosso-modo le suivant :

void object::thread_func()
{
    while (!shall_stop()) {
        m = q.peek_msg();
        if (m == SOME_ACTION) {
            lock_job_done_mutex();
            if (!job_done) {
                    if (do_the_job_once()) {
                        job_done = true;
                        q.pop_msg();
                    }
            }
            unlock_counter_mutex();
        }
    }
}

On peut bien évidemment utiliser une exception pour signifier une erreur dans la fonction do_the_job_once().

Le problème avec cette méthode est la quantité de code nécessaire à son exécution : il faut un mutex, plusieurs variables, et jouer avec l'état de chacune des variables pour produire l'effet escompté. Au final, le code écrit est complexe et cryptique, avec trois niveaux de test - on est loin d'une solution claire et élégante.

Ce type de code est suffisamment courant pour avoir entraîné dans l'API POSIX thread l'ajout d'une fonction spéciale nommée pthread_once(). Cette fonction a pour but d'assurer qu'une fonction passée en paramètre n'est exécutée qu'une seule et unique fois. La fonction se base sur l'utilisation d'un drapeau atomique (le once control), et applique l'algorithme suivant :

 fonction pthread_once(once_control, routine)
   si (once_control n'est pas SET)
   alors
       appel de routine
       once_control = SET
   fin-si

Bien évidemment, les accès à once_control sont atomiques et protégés par les barrières correspondantes - de manière à assurer que la routine n'est appelée qu'une seule et unique fois.

S'inspirant largement de cette fonction, le standard C++11 a introduit un mécanisme similaire, sous la forme d'une structure std::once_flag et d'une fonction std::call_once().

namespace std {
    struct once_flag;
template<class Callable, class ...Args> void call_once(once_flag& flag, Callable func, Args&&... args); }

Seulement, là où pthread_once() n'est pas impacté par une erreur se produisant dans la routine (celle-ci ne renvoie rien), il était impossible d'ignorer dans std::call_once() un point très sensible : la fonction passée en paramètre peut générer une exception. Dans ce cas, l'exception est traitée par le code appelant, de la même manière qu'une exception classique. Cependant, la levée d'une exception dans la routine appelée a pour conséquence que std::call_once() ne peut pas terminer son traitement, et l'instance de std::once_flag n'est donc pas mise dans l'état nécessaire pour empêcher un nouvel appel de la routine. La norme C++11 va donc plus loin que le comité POSIX pour définir le comportement complet de son implémentation std::call_once(). Pour cela, la norme définit plusieurs termes :

  • une exécution dite passive de call_once n'appelle pas le callable passé en paramètre ;
  • une exécution dite active de call_once appelle le callable passé en paramètre ;
  • une exécution exceptionnelle est une exécution active qui lève une exception ;
  • une exécution returning est une exécution active qui se termine sans lever d'exception ;

Une fois ces termes définis, on peut définir à son tour le comportement de std::call_once() :

  • une exécution exceptionnelle propage l'exception à l'appelant de std::call_once() ;
  • si une exécution returning existe, alors c'est la dernière exécution returning de std::call_once() ; ce qui signifie que le callable func n'est exécuté avec succès qu'une seule fois.
  • il ne peut y avoir d'exécution passive que s'il y a eu au préalable une exécution returning ;

Pour bien comprendre ce comportement, prenons le cas où 3 threads tentent d'exécuter le code de func au même moment.

Le thread 1 entre en exécution active, mais échoue et génère une exception. Pour que le thread 2 passe en exécution active, il faut qu'il soit en attente avant l'appel de func, et que cette attente prenne fin avant lorsque la première exécution exceptionnelle se termine. Dans ce cas, le thread 2 entre en exécution active, et exécute correctement func - c'est la première et dernier exécution returning. Le thread 3 et tous les threads suivants sont en exécution passive.

L'algorithme peut être décrit sous cette forme :

// state est une variable de documentation, sans utilité particulière
state = none; 
if (once_flag != DONE) {
    // pas encore de type, vu qu'on va probablement se mettre en attente
    state = execution; 
    scoped_lock lock(once_flag.mutex);
    // on doit revérifier once_flag, car une execution active peut s'être
    // terminée correctement (et once_flag peut avoir changé d'état)
    if (once_flag != DONE) {
        // on va exécuter func(), donc on passe en exécution active
        state = active_execution;
        try {
            func(params...);
        } catch (...) {
            // exception ! 
            state = exceptional_execution;
            throw;
        }
        once_flag = DONE;
        state = returning_execution;
    } else {
        state = passive_execution;
    }
}

Voilà pour l'algorithme. On voit qu'il est tout de même relativement complexe. Ceci dit, son utilisation est extrêmement simple, car on a besoin de manipuler que deux entités : une instance de std::once_flag, et la fonction std::call_once(). Si on reprends notre exemple ci-dessus, on a :

// l'instanciation de once_flag n'est pas atomique
std::once_flag job_flag;
void object::thread_func() { while (!shall_stop()) { m = peek_msg(); if (m == SOME_ACTION) std::call_once(job_flag, [](msgqueue& q) { do_the_job_once(); q.pop_msg(); }, std::ref(q)); } }

Et c'est tout[1].

Il reste que la fonction que vous appelez peut générer une exception que vous ne voulez pas voir (considérant alors que même dans ce cas, l'exécution s'est bien passée), ou renvoyer une erreur alors qu'il devrait générer une exception (c'est le cas pour de nombreuses fonctions écrites en C). Pour passer outre ces petites limitations, n'hésitez pas à faire comme je viens de le faire, c'est à dire de passer par une expression lambda.

Chacun son tour : les mutex

On a vu dans un article précédent ce qu'était, en essence, un mutex, et comment les mutex étaient implémentés vu de l'OS. Histoire de rendre cet article plus clair, un bref rappel s'impose.

Un mutex (pour MUTual EXclusion) est un mécanisme de synchronisation basé sur la notion de jeton unique. Lorsque qu'un utilisateur a besoin du jeton, il le demande au mutex. Deux cas peuvent alors se produire :

  • soit le jeton est disponible, et dans ce cas le demandeur le récupère ;
  • soit le jeton n'est pas disponible, et dans ce cas le demandeur attends qu'il le devienne.

L'acquisition du jeton et sa libération est une opération atomique - mais une fois cette acquisition effectuée, l'OS en est averti - pour la simple et bonne raison que c'est au niveau de l'OS que se passe l'attente si le jeton n'est pas immédiatement disponible.

On se sert d'un mutex pour rendre atomique l'accès à une ressource partagée entre deux ou plus de deux threads. Le fait d'acquérir le mutex permet à un thread de s'assurer qu'aucun autre thread n'est en train d'accéder à la ressource (que ce soit en lecture ou en écriture). Bien évidemment, il faut toujours s'assurer qu'on libère le mutex qu'on acquiert, sans quoi les autres threads sont irrémédiablement bloqués.

Au niveau de son interface, un objet qui agirait comme un mutex a l'obligation de proposer au moins deux méthodes :

void lock();
void unlock() noexcept;

La méthode lock() peut lever une exception, à la condition de garantir que le mutex n'a pas été acquis par le thread courant.

Un objet qui propose au moins ces deux méthodes est un objet dit BasicLockable. Pour être Lockable, un objet doit implémenter une troisième méthode :

bool try_lock();

Comme lock(), cette fonction peut générer une exception (à la condition que l'acquisition ne se soit pas faîte auparavant). La fonction doit renvoyer true en cas d'acquisition, et false dans tous les autres cas.

Si l'objet propose en outre une surcharge

bool try_lock(duration-type rel_time);

Alors cet objet est TimedLockable. rel_time est un timeout - tant que ce timeout n'est pas expiré, la tentative d'aquisition continue. A l'expiration, la fonction renvoie false. Si l'acquisition a été effectuée, la fonction renvoie true. Dans tous les cas, la fonction peut générer une exception - à la condition que l'acquisition ne soit pas encore effective au moment où l'exception est levée.

La librairie standard C++11 définit plusieurs types de mutex ; tous ces types (on va les détailler ci-dessous) sont DefaultConstructible[2], Destructible et Lockable. Il ne doit être ni Copiable, ni Moveable ; en cas d'erreur sur son initialisation, le mutex doit lever une exception du type std::system_error[3].

Les différents types de mutex définis par le standard C++11 sont les suivants

  • std::mutex est un mutex classique ;
  • std::recursive_mutex est un mutex qui peut être acquis plusieurs fois par le même thread (l'acquisition par un autre thread reste bloquant)
  • std::timed_mutex est un std::mutex qui est TimedLockable
  • std::timed_recursive_mutex est un std::recursive_mutex qui est TimedLockable.

Quel que soit le type de mutex choisi, on a accès aux méthodes lock(), unlock() et try_lock(). Les versions TimedLockable proposent en outre la méthode try_lock(rel_time).

Bien évidemment, on utilise rarement les mutex seuls - dans ce cas, il faudrait gérer soit-même l'acquisition et la libération des mutex, en prenant en compte les exceptions et les erreurs qui peuvent survenir pendant le traitement. On utilise RAII pour ne pas avoir besoin de gérer ces différents cas :

  • dans le constructeur de l'objet, on acquiert le mutex ;
  • dans son destructeur, on le libère.

Ce faisant, on s'assure que le mutex sera toujours libéré, même dans les cas de levée d'exception imprévus.

Un objet qui effectue ces opérations est communément appelée un verrou automatique (lock). La librairie standard C++11 propose plusieurs types de verrous automatiques :

  • std::lock_guard est un verrou basique qui applique exactement la technique décrite ci-dessus. L'objet verrouillé doit être un objet BasicLockable.
  • std::unique_lock est un verrou Movable. Les cas d'utilisation de ce type de verrou sont plus rares.

La classe std::lock_guard a le synopsis suivant :

namespace std {
    template <class Mutex>
    class lock_guard {
    public:
        typedef Mutex mutex_type;
explicit lock_guard(mutex_type& m); lock_guard(mutex_type& m, adopt_lock_t); ~lock_guard();
lock_guard(lock_guard const&) = delete; lock_guard& operator=(lock_guard const&) = delete; }; }

On voit que cette classe ne propose aucune méthode en dehors de ses constructeurs et de son destructeur. Son utilisation est donc basique :

std::mutex m;
void function() { std::lock_guard lock(m); // on fait ce qu'on veut avec la ressource protégée // ... } // lock libère m automatiquement

On remarque aussi un curieux constructeur, prenant un paramètre du type adopt_lock_t. La valeur std::adopt_lock définie par la librairie standard est un tag spécifique qui dit "le thread a déjà acquis ce mutex, donc n'essaie pas de l'acquérir de nouveau". Ce constructeur permet d'écrire ce type de code :

std::mutex m;
void function() { m.lock(); // on fait ce qu'on veut avec la ressource protégée // ...
std::lock_guard lock(m, std::adopt_lock); // lock devient propriétaire de m mais ne change pas // son état (m yant déjà été acquis plus haut). // ... } // lock libère m automatiquement

En plus de ces deux classes de verrou, la librairie standard propose deux fonctions plus génériques :

template <class L1, class L2, class... L3> 
    int try_lock(L1&, L2&, L3&...);
template <class L1, class L2, class... L3> void lock(L1&, L2&, L3&...);

Ces deux fonctions sont utilisée dans le cas particulier où on a besoin de plusieurs mutex pour protéger une ressource particulière. Par exemple, une ressource R1 est protégée par un mutex m1, la ressource R2 est protégée par le mutex m2, et il existe une dépendance entre R1 et R2. L'utilisation des fonctions ci-dessus permet de s'assurer de l'ordre d'acquisition de ces mutex. Prenons le cas proposé, et voyons ce qui peut arriver :

// thread 1
{
    std::lock_guard lock1(m1); 
    R1->update();
    {
        std::lock_guard lock2(m2);
        R2->data = R1->result;
    }
}
// thread 2 { std::lock_guard lock2(m2); std::lock_guard lock1(m1); if (R2->data != R1->result) R2->do_something(R1); }

Dans ce cas, il peut exister ce qu'on nomme un deadlock - un verrou mortel. thread 1 peut avoir acquis m1 et, dans le même temps, thread 2 peut avoir acquis m2. Du coup, thread 1 est en attente de m2 avant de pouvoir terminer son traitement et libérer m1. Mais dans le même temps, thread 2 est en attente de m1 pour pouvoir terminer son traitement et libérer m2. Aucune des conditions de libération ne peut être atteinte et les deux threads sont bloqués indéfiniment.

// thread 2
{
    std::lock_guard lock1(m1);
    std::lock_guard lock2(m2);
    if (R2->data != R1->result) 
        R2->do_something(R1);
}

Dans ce cas, thread 2 est soit bloqué sur l'acquisition de m1 (et dans ce cas, thread 1 va prendre à la fois m1 et m2, puis libérer les deux mutex), soit il acquiert m1 - et dans ce cas thread 1 est bloqué que l'acquisition de m1 et thread 2 peut acquérir m2 et terminer son traitement.

Les fonctions présentées ci-dessus permettent d'implémenter ce scénario de manière simple :

// thread 1
std::lock(m1, m2); 
// ...
// thread 2 std::lock(m1, m2);

On s'assure, avec un écriture simple, que l'ordre d'acquisition des verrous est toujours le même. Dans le cas où il diffère, une simple vérification des autres threads permet de voir le problème et de le corriger.

A noter qu'il n'y a pas de fonction std::unlock() correspondante. Il est de la responsabilité du programmeur de dévérouiller les mutex ainsi traités (ce n'est pas nécessairement trivial : par exemple, une exception peut être générée lors de l'acquisition de m2 ; m1 doit alors être libéré, mais pas m2)). A noter que c'est là un cas où l'utilisation de std::unique_lock() peut être intéressante.

Pour éliminer ce problème, il faut que chacun des thread acquiert les mutex dans le même ordre.

Attente d'évènements : les variables de conditions

Une variable de condition (autrement appelées variables condition) est un mécanisme de synchronisation utilisé pour signaler un évènement. Son principe de fonctionnement est le suivant :

  • une variable de condition peut exister dans deux états différents
    • l'état non signalé ;
    • l'état signalé ;
  • N threads peuvent attendre que la variable de condition passe dans l'état signalé.

L'API Posix implémente les variables de condition via les fonctions pthread_cond_init(), pthread_cond_signal(), pthread_cond_wait()... Sous Windows, l'API CreateEvent(), SetEvent() et les fonctions génériques WaitForSingleObject() et WaitForMultipleObject() peuvent être utilisée dans ce contexte (avec une légère différence sémantique).

L'interface proposée par la librairie pthread de Posix se distingue de celle proposée par Windows sur un point très particulier : la variable de condition est intimement liée à un mutex - ce n'est pas le cas sous Windows, ou le HEVENT correspondant n'a pas de dépendance publique à un mutex. La librairie reprends ce point particulier.

La librairie standard C++11 propose en fait deux types de variables de condition : std::condition_variable et std::condition_variable_any. Les deux types sont très similaires dans leur interface et dans leur utilisation - le permier, que je vais traiter ici, ne se base que sur des std::unique_lock<>, tandis que le second accepte tout type de lock.

namespace std {
    class condition_variable {
    public:
        condition_variable();
        ~condition_variable();
condition_variable(const condition_variable&) = delete; condition_variable& operator=(const condition_variable&) = delete;
void notify_one() noexcept; void notify_all() noexcept;
void wait(unique_lock<mutex>& lock);
template <class Predicate> void wait(unique_lock<mutex>& lock, Predicate pred);
template <class Clock, class Duration> cv_status wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time);
template <class Clock, class Duration, class Predicate> bool wait_until(unique_lock<mutex>& lock, const chrono::time_point<Clock, Duration>& abs_time, Predicate pred);
template <class Rep, class Period> cv_status wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time);
template <class Rep, class Period, class Predicate> bool wait_for(unique_lock<mutex>& lock, const chrono::duration<Rep, Period>& rel_time, Predicate pred);
typedef implementation-defined native_handle_type; native_handle_type native_handle(); }; }

En examinant la classe, on remarque plusieurs choses :

  1. l'initialisation d'une variable de condition n'est pas dépendante d'une valeur proposée par l'utilisateur - à la création, la variable est dans l'état non signalé.
  2. chaque fonction wait... existe sous deux formes : une forme sans prédicat, et une forme avec prédicat. La forme avec prédicat exécute essentiellement l'algorithme suivant :
while (!pred())
    wait(lock);

Deux fonctions de signalisation sont proposées :

  • notify_one() réveillle un seul thread qui est en attente sur la variable de condition au moment où celle-ci est signalée.
  • notify_all() réveille tous les threads qui sont en attente sur la variable de condition au moment où celle-ci est signalée.

La classe std::condition_variable ne propose pas d'autres services - que des services d'attente et de signalement. Son utilisation (sans prédicat ou avec prédicat) est par conséquent très simple :

// déclarations
std::mutex mcond;
std::unique_lock<std::mutex> ulock;
std::condition_variable cond;
// thread 1 if (something_happened) cond.notify_one();
// thread 2, ..., N ulock.lock(); cond.wait(ulock); // bloquant // là: quelque chose s'est passé ulock.unlock();

La version avec prédicat n'est pas tellement différente :

// thread 2, ..., N
ulock.lock();
cond.wait(ulock, []() -> bool {
    return (something == yes) ? true : false;
});
ulock.unlock();

Les autres fonctions d'attente (wait_until() et wait_for()) sont par essence des fonctions qui peuvent redonner le contrôle à l'appelant après un timeout - fort heureusement, ces fonctions renvoient un booléen mis à false dans ce cas, ce qui permet de différencier une attente écourtée par le signalement de la variable de condition d'une attente s'étant terminée à la fin du timeout.

Il y a quelque chose que vous avez du voir dans le code ci-dessus : on entre dans une fonction d'attente avec le verrou acquis. La raison tiens au fonctionnement des variables de condition.

Au niveau de sa sémantique, il faut comprendre la variable de condition comme une variable quelconque. Comme toutes les autres ressources partagées entre deux thread, son accès doit être protégé. L'algorithme des fonctions d'attente est (grosso-modo) équivalent à :

void wait(lock)
{
    do {
        this_thread::yield();
    } while (internal_condition != signaled)
}

Le problème est alors que si l'accès à la variable est verrouillé, alors les autres threads ne peuvent plus acquérir la variable pour la signaler. Du coup, on est obligé de déverrouiller la variable de condition pendant le corps de la boucle (et, nécessairement, on doit la verrouiller de nouveau après).

void wait(lock)
{
    do {
        lock.unlock();
        this_thread::yield();
        lock.lock();
    } while (internal_condition != signaled)
}

Le fait de forcer le scheduler de l'OS à passer à un autre thread permet de s'assurer qu'à un moment ou à un autre, le thread qui va signaler la variable de condition va pouvoir le faire.

Conclusion

Au cours des semaines passées, nous avons pu aborder la programmation concurrente du point de vue de l'OS et de la machine, et plus récemment au niveau C++11. Nous avons abordé la création et l'utilisation de thread, de variables atomiques, (via ce billet) des variables de conditions et des verrous d'exclusion mutuelle. Ces quelques points sont généralement suffisant pour traiter n'importe quel type de problématiques liée à la programmation concurrente.

Ceci étant dit, nous n'en avons pas encore fini avec le sujet. Il reste encore un bon nombre de choses à explorer avant cela, et d'autres articles sont déjà prévus pour discuter de ces différents points. Nous nous reverrons donc très, très bientôt !

Notes

[1] si vous vous demandez pourquoi je passe q (la queue de messages) en paramètre, et non pas sous la forme d'une variable capturée, la réponse est simple : c'est parce qu'on ne peut pas capturer this, ni une variable membre de la classe. Quand au std::ref(), il est utilisé pour encapsuler l'objet q, qui doit être considéré comme une référence. Sans std::ref(), q est passé par valeur, et non pas par référence (à cause de l'implémentation de l'appel de func).

[2] si la type est X, alors X() est un expression valide.

[3] c'est notamment le cas sous Linux si vous oubliez l'option -pthread au moment de compiler le module et de linker le logiciel final.

Commentaires

1. Le lundi, mars 25 2013, 11:32 par gbdivers

Merci pour cet excellent article

Concernant les mutex (et les approches bloquantes en général), il y a une vidéo très intéressante à voir de Herb Sutter : C++ and Beyond 2012: Herb Sutter - C++ Concurrency. Je n'ai pas encore regardé les propositions pré-bristol du SG Concurrency, mais il est possible que les nouvelles classes proposées par Herb sont soient dans les proposals pour la prochaine norme.

Guillaume

2. Le lundi, mars 25 2013, 14:11 par Emmanuel Deloget

Bonjour Guillaume,

C'est moi qui te remercie pour tes relectures attentives - ça fait deux fois que tu me trouves des petites erreurs dans le code et dans le texte (erreurs maintenant corrigées).

J'en ai proffité pour regarder la vidéo de Herb Sutter, et j'avoue qu'il y a effectivement des choses très, très intéressantes :

  • les méthodes std::future::then() et std::future::when_any()/std::future::when_all() sont une très, très bonne idée
  • idem pour std::monitor<T>
  • Herb Sutter introduit un type worker_thread. Du au fait que je ne suivait la vidéo que par accoup, je n'ai pas vu s'il pensait à une classe de la librairie standard. Quoi qu'il en soit, ça me semble être une bonne chose aussi.

Je réfléchi de mon coté à une classe spin_lock (en version TimedLockable ou non ; je viens d'ailleurs de me rendre compte d'une déficience du standard vis à vis des concepts : nulle part le standard ne précise de cas où un appel système est interdit - ce qui a des conséquences importantes en termes de gestion de la concurrence ; j'en parlerai dans un prochain billet (pas le prochain, car il est déjà réservé pour parler de promesse, de futur, et d'asynchronisme (le vilain teasing))).

3. Le jeudi, mars 28 2013, 16:12 par gbdivers

J'ai trouvé : les fonctionnalités présentées par Herb Sutter dans le vidéo ont été proposé dans le draft N3558 : http://www.open-std.org/JTC1/SC22/W...
Bonne nouvelle donc

Ajouter un commentaire

Les commentaires peuvent être formatés en utilisant une syntaxe wiki simplifiée.

Fil des commentaires de ce billet