DeenruvDeenruv
Przewodnik programisty

Worker i kolejka zadań

Poznaj proces workera Deenruv i system kolejki zadań do obsługi zadań w tle

Worker Deenruv to proces Node.js odpowiedzialny za uruchamianie obliczeniowo intensywnych lub długotrwałych zadań w tle. Na przykład aktualizacja indeksu wyszukiwania lub wysyłanie e-maili. Uruchamianie takich zadań w tle pozwala serwerowi zachować responsywność, ponieważ odpowiedź może być zwrócona natychmiast, bez czekania na zakończenie wolniejszych zadań.

Innymi słowy, Worker wykonuje zadania (jobs), które zostały umieszczone w kolejce zadań (job queue).

Worker

Worker jest uruchamiany przez wywołanie funkcji bootstrapWorker() z tą samą konfiguracją, jaka jest przekazywana do głównej funkcji serwera bootstrap(). W standardowej instalacji Deenruv znajduje się to w pliku index-worker.ts:

src/index-worker.ts
import { bootstrapWorker } from '@deenruv/core';
import { config } from './deenruv-config';

bootstrapWorker(config)
    .then(worker => worker.startJobQueue())
    .catch(err => {
        console.log(err);
    });

Architektura

Worker jest samodzielną aplikacją NestJS. Oznacza to, że jest niemal identyczny z główną aplikacją serwera, ale nie posiada warstwy sieciowej nasłuchującej żądań. Serwer komunikuje się z workerem za pośrednictwem architektury „kolejki zadań". Dokładna implementacja kolejki zadań zależy od skonfigurowanej JobQueueStrategy, ale domyślnie worker odpytuje bazę danych w poszukiwaniu nowych zadań.

Wiele workerów

Możliwe jest uruchamianie wielu workerów równolegle, aby lepiej radzić sobie z dużym obciążeniem. Używając konfiguracji JobQueueOptions.activeQueues, można nawet dedykować określone workery do jednego lub więcej konkretnych typów zadań. Na przykład, jeśli Twoja aplikacja wykonuje transkodowanie wideo, możesz chcieć skonfigurować dedykowanego workera wyłącznie do tego zadania:

src/transcoder-worker.ts
import { bootstrapWorker, mergeConfig } from '@deenruv/core';
import { config } from './deenruv-config';

const transcoderConfig = mergeConfig(config, {
    jobQueueOptions: {
        activeQueues: ['transcode-video'],
    },
});

bootstrapWorker(transcoderConfig)
    .then(worker => worker.startJobQueue())
    .catch(err => {
        console.log(err);
    });

Uruchamianie zadań w głównym procesie

Możliwe jest uruchamianie zadań z kolejki w głównym procesie serwera. Jest to używane głównie do testowania i zadań zautomatyzowanych i nie jest zalecane do produkcji, ponieważ niweluje korzyści z uruchamiania długich zadań poza głównym procesem. Aby to zrobić, musisz ręcznie uruchomić JobQueueService:

src/index.ts
import { bootstrap, JobQueueService } from '@deenruv/core';
import { config } from './deenruv-config';

bootstrap(config)
    .then(app => app.get(JobQueueService).start())
    .catch(err => {
        console.log(err);
        process.exit(1);
    });

ProcessContext

Czasami Twój kod może potrzebować wiedzieć, czy jest uruchamiany jako część procesu serwera czy workera. W takim przypadku możesz wstrzyknąć provider ProcessContext i odpytać go w następujący sposób:

src/plugins/my-plugin/services/my.service.ts
import { Injectable, OnApplicationBootstrap } from '@nestjs/common';
import { ProcessContext } from '@deenruv/core';

@Injectable()
export class MyService implements OnApplicationBootstrap {
    constructor(private processContext: ProcessContext) {}

    onApplicationBootstrap() {
        if (this.processContext.isServer) {
            // code which will only execute when running in
            // the server process
        }
    }
}

Kolejka zadań

Deenruv używa kolejki zadań do obsługi przetwarzania określonych zadań, które są zazwyczaj zbyt wolne, aby uruchamiać je w normalnym cyklu żądanie-odpowiedź. Normalny cykl żądanie-odpowiedź wygląda tak:

W normalnym cyklu żądanie-odpowiedź wszystkie pośrednie zadania (wyszukiwanie danych w bazie, wykonywanie logiki biznesowej itp.) wykonują się przed zwróceniem odpowiedzi. W przypadku większości operacji jest to w porządku, ponieważ te pośrednie zadania są bardzo szybkie.

Niektóre operacje jednak wymagają wykonania znacznie dłuższych zadań. Na przykład aktualizacja indeksu wyszukiwania dla tysięcy produktów może zająć minutę lub więcej. W takim przypadku zdecydowanie nie chcemy opóźniać odpowiedzi, dopóki przetwarzanie się nie zakończy. Właśnie do tego służy kolejka zadań:

Do czego Deenruv używa kolejki zadań?

Domyślnie Deenruv używa kolejki zadań do następujących celów:

  • Przebudowa indeksu wyszukiwania
  • Aktualizacja indeksu wyszukiwania po zmianach w produktach, wariantach, zasobach itp.
  • Aktualizacja zawartości kolekcji
  • Wysyłanie e-maili transakcyjnych

Jak działa kolejka zadań?

Ten diagram ilustruje mechanizm kolejki zadań:

Serwer dodaje zadania do kolejki. Worker następnie pobiera te zadania z kolejki i przetwarza je sekwencyjnie, jedno po drugim (możliwe jest zwiększenie przepustowości kolejki zadań poprzez uruchamianie wielu workerów lub zwiększenie współbieżności pojedynczego workera).

JobQueueStrategy

Właściwa część kolejki jest definiowana przez skonfigurowaną JobQueueStrategy.

Jeśli nie zdefiniowano żadnej strategii, Deenruv używa magazynu w pamięci do przechowywania zawartości każdej kolejki. Choć ma to zaletę braku wymagań dotyczących zewnętrznych zależności, nie nadaje się do produkcji, ponieważ po zatrzymaniu serwera cała kolejka zostanie utracona, a oczekujące zadania nigdy nie zostaną przetworzone. Ponadto nie może być używana przy uruchamianiu workera jako osobnego procesu.

Lepszą alternatywą jest użycie DefaultJobQueuePlugin (który będzie używany w standardowej instalacji @deenruv/create), konfigurującego Deenruv do używania SqlJobQueueStrategy. Ta strategia używa bazy danych jako kolejki, co oznacza, że nawet jeśli serwer Deenruv się zatrzyma, oczekujące zadania zostaną zachowane i po ponownym uruchomieniu będą przetworzone.

Możliwe jest również zaimplementowanie własnej JobQueueStrategy, aby wykorzystać inne technologie. Przykłady obejmują RabbitMQ, Google Cloud Pub Sub i Amazon SQS. Implementacja niestandardowej strategii opartej na jednej z nich może mieć sens, jeśli domyślne podejście oparte na bazie danych nie spełnia wymagań wydajnościowych.

Wydajność kolejki zadań

W większych projektach Deenruv często definiuje się wiele niestandardowych kolejek zadań. Przy użyciu DefaultJobQueuePlugin z wieloma kolejkami wydajność może być obniżona. Wynika to z tego, że SqlJobQueueStrategy używa pollingu do sprawdzania nowych zadań w bazie danych. Każda kolejka domyślnie odpytuje bazę danych co 200ms. Jeśli więc istnieje 10 kolejek, daje to stałe 50 zapytań/sekundę.

W takim przypadku zaleca się wypróbowanie BullMQJobQueuePlugin, który używa wydajnej strategii push opartej na Redis.

Używanie kolejek zadań w pluginie

Jeśli Twój plugin obejmuje długotrwałe zadania, możesz również skorzystać z kolejki zadań.

Rzeczywisty przykład tego znajdziesz w kodzie źródłowym EmailPlugin

Załóżmy, że budujesz plugin, który pozwala na podanie URL do wideo, a następnie to wideo jest transkodowane do formatu odpowiedniego do streamingu na stronie sklepu. Jest to długotrwałe zadanie, które nie powinno blokować głównego wątku, więc użyjemy kolejki zadań do uruchomienia tego zadania na workerze.

Najpierw dodamy nową mutację do schematu Admin API:

src/plugins/product-video/api/api-extensions.ts
import gql from 'graphql-tag';

export const adminApiExtensions = gql`
    extend type Mutation {
        addVideoToProduct(productId: ID!, videoUrl: String!): Job!
    }
`;

Resolver wygląda następująco:

src/plugins/product-video/api/product-video.resolver.ts
import { Args, Mutation, Resolver } from '@nestjs/graphql';
import { Allow, Ctx, RequestContext, Permission, RequestContext } from '@deenruv/core';
import { ProductVideoService } from '../services/product-video.service';

@Resolver()
export class ProductVideoResolver {
    constructor(private productVideoService: ProductVideoService) {}

    @Mutation()
    @Allow(Permission.UpdateProduct)
    addVideoToProduct(@Ctx() ctx: RequestContext, @Args() args: { productId: ID; videoUrl: string }) {
        return this.productVideoService.transcodeForProduct(args.productId, args.videoUrl);
    }
}

Resolver jedynie definiuje sposób obsługi nowej mutacji addVideoToProduct, delegując właściwą pracę do ProductVideoService.

Tworzenie kolejki zadań

Użyj npx deenruv add, aby łatwo dodać kolejkę zadań do serwisu.

JobQueueService tworzy i zarządza kolejkami zadań. Kolejka jest tworzona podczas uruchamiania aplikacji (patrz zdarzenia cyklu życia NestJS), a następnie możemy użyć metody add() do dodawania zadań do kolejki.

src/plugins/product-video/services/product-video.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { JobQueue, JobQueueService, ID, Product, TransactionalConnection } from '@deenruv/core';
import { transcode } from 'third-party-video-sdk';

@Injectable()
class ProductVideoService implements OnModuleInit {
    private jobQueue: JobQueue<{ productId: ID; videoUrl: string }>;

    constructor(
        private jobQueueService: JobQueueService,
        private connection: TransactionalConnection,
    ) {}

    async onModuleInit() {
        this.jobQueue = await this.jobQueueService.createQueue({
            name: 'transcode-video',
            process: async job => {
                // Inside the `process` function we define how each job
                // in the queue will be processed.
                // In this case we call out to some imaginary 3rd-party video
                // transcoding API, which performs the work and then
                // returns a new URL of the transcoded video, which we can then
                // associate with the Product via the customFields.
                const result = await transcode(job.data.videoUrl);
                await this.connection.getRepository(Product).save({
                    id: job.data.productId,
                    customFields: {
                        videoUrl: result.url,
                    },
                });
                // The value returned from the `process` function is stored as the "result"
                // field of the job (for those JobQueueStrategies that support recording of results).
                //
                // Any error thrown from this function will cause the job to fail.
                return result;
            },
        });
    }

    transcodeForProduct(productId: ID, videoUrl: string) {
        // Add a new job to the queue and immediately return the
        // job itself.
        return this.jobQueue.add({ productId, videoUrl }, { retries: 2 });
    }
}

Zwróć uwagę na parametr typu generycznego JobQueue:

JobQueue<{ productId: ID; videoUrl: string }>;

Oznacza to, że gdy wywołujemy jobQueue.add(), musimy przekazać obiekt tego typu. Dane te będą następnie dostępne w funkcji process jako właściwość job.data.

Dane przekazywane do jobQueue.add() muszą być serializowalne do JSON, ponieważ są konwertowane na ciąg znaków podczas przechowywania w kolejce zadań. Dlatego należy unikać przekazywania złożonych obiektów, takich jak instancje Date, Buffery itp.

ProductVideoService odpowiada za konfigurację JobQueue i dodawanie zadań do tej kolejki. Wywołanie

productVideoService.transcodeForProduct(id, url);

doda zadanie transkodowania do kolejki.

Kod pluginu zazwyczaj jest wykonywany zarówno na serwerze, jak i na workerze. Dlatego czasami trzeba jawnie sprawdzić, w jakim kontekście się znajdujesz. Można to zrobić za pomocą providera ProcessContext.

Na koniec ProductVideoPlugin łączy wszystko w całość — rozszerza GraphQL API, definiuje wymagane CustomField do przechowywania URL transkodowanego wideo oraz rejestruje nasz serwis i resolver. PluginCommonModule jest importowany, ponieważ eksportuje JobQueueService.

src/plugins/product-video/product-video.plugin.ts
import gql from 'graphql-tag';
import { PluginCommonModule, VendurePlugin } from '@deenruv/core';
import { ProductVideoService } from './services/product-video.service';
import { ProductVideoResolver } from './api/product-video.resolver';
import { adminApiExtensions } from './api/api-extensions';

@DeenruvPlugin({
    imports: [PluginCommonModule],
    providers: [ProductVideoService],
    adminApiExtensions: {
        schema: adminApiExtensions,
        resolvers: [ProductVideoResolver],
    },
    configuration: config => {
        config.customFields.Product.push({
            name: 'videoUrl',
            type: 'string',
        });
        return config;
    },
})
export class ProductVideoPlugin {}

Przekazywanie RequestContext

Często zachodzi potrzeba przekazania obiektu RequestContext do funkcji process zadania, ponieważ ctx jest wymagany przez wiele metod serwisów Deenruv, których możesz używać wewnątrz funkcji process. Jednak sam obiekt RequestContext nie jest serializowalny, więc nie może być bezpośrednio przekazany do metody JobQueue.add(). Zamiast tego możesz serializować RequestContext za pomocą metody RequestContext.serialize(), a następnie deserializować go w funkcji process za pomocą statycznej metody deserialize:

import { Injectable, OnModuleInit } from '@nestjs/common';
import {
    JobQueue,
    JobQueueService,
    Product,
    TransactionalConnection,
    SerializedRequestContext,
    RequestContext,
} from '@deenruv/core';

@Injectable()
class ProductExportService implements OnModuleInit {
    private jobQueue: JobQueue<{ ctx: SerializedRequestContext }>;

    constructor(
        private jobQueueService: JobQueueService,
        private connection: TransactionalConnection,
    ) {}

    async onModuleInit() {
        this.jobQueue = await this.jobQueueService.createQueue({
            name: 'export-products',
            process: async job => {
                const ctx = RequestContext.deserialize(job.data.ctx);
                const allProducts = await this.connection.getRepository(ctx, Product).find();
                // ... logic to export the product omitted for brevity
            },
        });
    }

    exportAllProducts(ctx: RequestContext) {
        return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) });
    }
}

Obsługa anulowania zadania

Administrator może anulować uruchomione zadanie. Spowoduje to, że skonfigurowana strategia kolejki zadań oznaczy zadanie jako anulowane, ale samo w sobie nie zatrzyma uruchomionego zadania. Wynika to z tego, że kolejka zadań nie ma bezpośredniej kontroli nad funkcją process po jej rozpoczęciu.

To do funkcji process należy sprawdzenie, czy zadanie zostało anulowane, i zatrzymanie przetwarzania. Można to zrobić, sprawdzając właściwość job.state, a jeśli zadanie zostało anulowane, funkcja process może rzucić wyjątek wskazujący, że zadanie zostało przerwane przez wcześniejsze anulowanie:

import { Injectable, OnModuleInit } from '@nestjs/common';
import {
    JobQueue,
    JobQueueService,
    Product,
    TransactionalConnection,
    SerializedRequestContext,
    RequestContext,
    Job,
    JobState,
} from '@deenruv/core';
import { IsNull } from 'typeorm';

@Injectable()
class ProductExportService implements OnModuleInit {
    private jobQueue: JobQueue<{ ctx: SerializedRequestContext }>;

    constructor(
        private jobQueueService: JobQueueService,
        private connection: TransactionalConnection,
    ) {}

    async onModuleInit() {
        this.jobQueue = await this.jobQueueService.createQueue({
            name: 'export-products',
            process: async job => {
                const ctx = RequestContext.deserialize(job.data.ctx);
                const allProducts = await this.connection.getRepository(ctx, Product).find({
                    where: { deletedAt: IsNull() },
                });
                let successfulExportCount = 0;
                for (const product of allProducts) {
                    if (job.state === JobState.CANCELLED) {
                        // If the job has been cancelled, stop processing
                        // to prevent unnecessary work.
                        throw new Error('Job was cancelled');
                    }

                    // ... logic to export the product omitted for brevity
                    successfulExportCount++;
                }
                return { successfulExportCount };
            },
        });
    }

    exportAllProducts(ctx: RequestContext) {
        return this.jobQueue.add({ ctx: RequestContext.serialize(ctx) });
    }
}

Subskrybowanie aktualizacji zadania

Podczas tworzenia nowego zadania za pomocą JobQueue.add() możliwe jest subskrybowanie aktualizacji tego zadania (postęp i zmiany statusu). Pozwala to na przykład tworzyć resolvery, które mogą zwracać wyniki danego zadania.

W powyższym przykładzie transkodowania wideo moglibyśmy zmodyfikować wywołanie transcodeForProduct() w następujący sposób:

src/plugins/product-video/services/product-video.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { of } from 'rxjs';
import { map, catchError } from 'rxjs/operators';
import { ID, Product, TransactionalConnection } from '@deenruv/core';

@Injectable()
class ProductVideoService implements OnModuleInit {
    // ... omitted (see above)

    transcodeForProduct(productId: ID, videoUrl: string) {
        const job = await this.jobQueue.add({ productId, videoUrl }, { retries: 2 });

        return job.updates().pipe(
            map(update => {
                // The returned Observable will emit a value for every update to the job
                // such as when the `progress` or `status` value changes.
                Logger.info(`Job ${update.id}: progress: ${update.progress}`);
                if (update.state === JobState.COMPLETED) {
                    Logger.info(`COMPLETED ${update.id}: ${update.result}`);
                }
                return update.result;
            }),
            catchError(err => of(err.message)),
        );
    }
}

Jeśli wolisz pracować z Promises zamiast Observable z RxJS, możesz również przekonwertować aktualizacje na promise:

const job = await this.jobQueue.add({ productId, videoUrl }, { retries: 2 });

return job.updates().toPromise().then(/* ... */).catch(/* ... */);

Na tej stronie