Je travaille sur une application threadée où un thread alimentera un Queue avec des objets à modifier et un certain nombre d'autres threads liront ensuite dans la file d'attente, feront les modifications et enregistreront les changements.

L'application n'aura pas besoin de beaucoup de concurrence, donc j'aimerais m'en tenir à une base de données SQLite. Voici un petit exemple illustrant l'application:

import queue
import threading
import peewee as pw

db = pw.SqliteDatabase('test.db', threadlocals=True)

class Container(pw.Model):
    contents = pw.CharField(default="spam")

    class Meta:
        database = db


class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        containers = Container.select()

        for container in containers:
            self.q.put(container)


class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super().__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context() as ctx:
                # Get a new connection to the container object:
                container = Container.get(id=item.id)
                container.contents = "eggs"
                container.save()

            self.q.task_done()


if __name__ == "__main__":

    db.connect()
    try:
        db.create_tables([Container,])
    except pw.OperationalError:
        pass
    else:
        [Container.create() for c in range(42)]
    db.close()

    q = queue.Queue(maxsize=10)


    feeder = FeederThread(q)
    feeder.setDaemon(True)
    feeder.start()

    for i in range(10):
        reader = ReaderThread(q)
        reader.setDaemon(True)
        reader.start()

    q.join()

Basé sur les documents peewee, le multi-threading doit être pris en charge pour SQLite. Cependant, j'obtiens la tristement célèbre erreur peewee.OperationalError: database is locked avec la sortie d'erreur pointant vers la ligne container.save().

Comment puis-je contourner cela?

1
digitaldingo 22 juil. 2015 à 13:11

2 réponses

Meilleure réponse

J'ai été un peu surpris de voir cela échouer également, alors j'ai copié votre code et joué avec des idées différentes. Ce que je pense que le problème est, c'est que ExecutionContext() par défaut provoquera l'exécution du bloc encapsulé dans une transaction. Pour éviter cela, j'ai passé False dans les fils de discussion du lecteur.

J'ai également édité le chargeur pour consommer l'instruction SELECT avant de mettre des choses dans la file d'attente (list(Container.select())).

Ce qui suit fonctionne pour moi localement:

class FeederThread(threading.Thread):

    def __init__(self, input_queue):
        super(FeederThread, self).__init__()

        self.q = input_queue

    def run(self):
        containers = list(Container.select())

        for container in containers:
            self.q.put(container.id)  # I don't like passing model instances around like this, personal preference though

class ReaderThread(threading.Thread):

    def __init__(self, input_queue):
        super(ReaderThread, self).__init__()

        self.q = input_queue

    def run(self):
        while True:
            item = self.q.get()

            with db.execution_context(False):
                # Get a new connection to the container object:
                container = Container.get(id=item)
                container.contents = "nuggets"
                with db.atomic():
                    container.save()

            self.q.task_done()

if __name__ == "__main__":

    with db.execution_context():
        try:
            db.create_tables([Container,])
        except OperationalError:
            pass
        else:
            [Container.create() for c in range(42)]

    # ... same ...

Je ne suis pas entièrement satisfait de cela, mais j'espère que cela vous donnera des idées.

Voici un article de blog que j'ai écrit il y a quelque temps et qui contient quelques astuces pour améliorer la concurrence avec SQLite: http://charlesleifer.com/blog/sqlite-small-fast-reliable-choose-any-three-/

3
coleifer 22 juil. 2015 à 17:22

Avez-vous essayé le mode WAL?

Améliorer les performances INSERT par seconde de SQLite?

Vous devez être très prudent si vous avez un accès simultané à SQLite, car toute la base de données est verrouillée lorsque les écritures sont effectuées, et bien que plusieurs lecteurs soient possibles, les écritures seront verrouillées. Cela a été quelque peu amélioré avec l'ajout d'un WAL dans les nouvelles versions de SQLite.

Et

Si vous utilisez plusieurs threads, vous pouvez essayer d'utiliser le cache de page partagé, qui permettra aux pages chargées d'être partagées entre les threads, ce qui peut éviter des appels d'E / S coûteux.

0
Community 23 mai 2017 à 12:08