機能このページを編集.md

Workers

@lazarv/react-server"use worker" ディレクティブを使用すると、重い計算やブロックするタスクを別のスレッドにオフロードできます。サーバー側では、"use worker" でマークされた関数は Node.js ワーカースレッド (node:worker_threads) で実行されます。クライアント側では、同じディレクティブがコードをWeb Worker(ブラウザのWorker API)で実行します。いずれの場合も、ワーカー関数は通常の非同期関数と同様にインポートして呼び出せます。スレッドの作成、メッセージの受け渡し、シリアライズはランタイムが透過的に処理します。

メインスレッドとワーカー間でやり取りされる全データは、React Server Components (RSC) Flightプロトコルを用いてシリアライズされます。これによりワーカー関数は単純な値だけでなく、*ReactエレメントSuspenseバウンダリPromiseuse()フックによる遅延レンダリング用)、およびReadableStreamも返すことが可能です。

ランタイムが先頭に "use worker" を記述したファイルを検出すると、ビルド時にすべてのエクスポートを薄いプロキシ関数に置き換えます。元のモジュールコードはワーカースレッド内で実行される仮想モジュールに移動されます。エクスポートされた関数を呼び出すと、プロキシは

  1. RSC Flightプロトコルを使用して引数をシリアライズする。
  2. 関数名とシリアライズされた引数を含むメッセージをワーカースレッド(またはWeb Worker)に投稿する。
  3. ワーカーは引数をデシリアライズし、関数を実行し、戻り値を再びシリアライズする。
  4. プロキシは結果をデシリアライズし、返されたPromiseを解決する。

ReadableStreamの値は、postMessage転送リストを介してスレッド間で転送(ゼロコピー)されるため、ストリーミング結果の処理が効率的です。

サーバー上では、"use worker" モジュールはNode.js ワーカースレッドで実行されます。ワーカーは最初の呼び出し時に遅延起動され、以降の呼び出しでは再利用されます。ワーカーがクラッシュした場合、自動的に再起動されます。

基本的な使用方法

ファイルの先頭に "use worker" ディレクティブを記述し、非同期関数をエクスポートします。

"use worker"; export async function computeFactorial(n) { if (n <= 1) return 1; return n * computeFactorial(n - 1); }

任意のサーバーコンポーネントからインポートして呼び出せます。

import { computeFactorial } from "./computeFactorial"; export default async function FactorialPage({ number }) { const result = await computeFactorial(42); return <div>Factorial of 42 is {result}</div>; }

computeFactorial 関数は専用のワーカースレッドで実行され、メインのサーバースレッドを他のリクエスト処理に自由に使える状態に保ちます。

CPU負荷の高い計算

ワーカーはサーバーサイドレンダリングを妨げる可能性のあるCPU負荷の高いタスクに最適です。

"use worker"; export async function findPrimes(limit) { const start = Date.now(); const sieve = new Uint8Array(limit + 1); const primes = []; for (let i = 2; i <= limit; i++) { if (!sieve[i]) { primes.push(i); for (let j = i * i; j <= limit; j += i) sieve[j] = 1; } } return { count: primes.length, largest: primes.at(-1), duration: Date.now() - start, }; }
import { findPrimes } from "./worker"; export default async function PrimesPage() { const result = await findPrimes(100_000); return <div>Found {result.count} primes in {result.duration}ms</div>; }

Node.js APIへのアクセス

サーバーワーカーはNode.jsで実行されるため、Node.jsの組み込みモジュールをすべて利用できます。

"use worker"; import { workerData } from "node:worker_threads"; import { setTimeout } from "node:timers/promises"; export async function getSystemInfo() { await setTimeout(100); const mem = process.memoryUsage(); return { heapUsed: (mem.heapUsed / 1024 / 1024).toFixed(1) + " MB", uptime: process.uptime().toFixed(1) + "s", workerData: JSON.stringify(workerData), }; }

他モジュールのインポート

ワーカーファイルは他のモジュールからインポートできます。"use worker" ディレクティブを持つファイルのみがワーカーエントリとなります(インポートされたモジュールは通常通りワーカーにバンドルされます)。

// WorkerModule.mjs — 通常のモジュール(ディレクティブは不要) export function getSystemInfo() { return { platform: process.platform, nodeVersion: process.version, }; }
"use worker"; import { getSystemInfo } from "./WorkerModule.mjs"; export async function getWorkerSystemInfo() { return getSystemInfo(); }

通信にはRSC Flightプロトコルが使用されるため、ワーカー関数はReactエレメントSuspenseバウンダリを含むコンポーネント)を返すことができます。ランタイムはコンポーネントツリー全体をシリアライズし、呼び出し元で再構築します。

"use worker"; import { Suspense } from "react"; async function ExpensiveChart() { // 高コストなデータ処理をシミュレートする const data = await computeChartData(); return ( <div className="chart"> <h3>Results</h3> <ul> {data.map((d) => <li key={d.id}>{d.label}: {d.value}</li>)} </ul> </div> ); } export async function getChart() { return ( <Suspense fallback={<p>Loading chart...</p>}> <ExpensiveChart /> </Suspense> ); }
import { getChart } from "./chartWorker"; export default async function Dashboard() { const chart = await getChart(); return <main>{chart}</main>; }

<Suspense>バウンダリは期待通りに動作します。ExpensiveChartがワーカースレッドで解決される間、フォールバックが表示されます。

ワーカー関数は ReadableStream を返すことができます。このストリームはワーカーとメインスレッド間で転送(ゼロコピー)されるため、大規模データや増分データの処理に効率的です。通常このストリームはクライアントコンポーネントに渡され、そこで順次読み込まれます。

"use worker"; export async function streamActivity() { const steps = [ { phase: "init", msg: "Initializing" }, { phase: "process", msg: "Processing data" }, { phase: "compute", msg: "Running computation" }, { phase: "done", msg: "Complete" }, ]; return new ReadableStream({ async start(controller) { for (const step of steps) { controller.enqueue( JSON.stringify({ ...step, time: new Date().toISOString() }) + "\n" ); await new Promise((r) => setTimeout(r, 300)); } controller.close(); }, }); }

クライアントコンポーネントでストリームを消費する。

import { streamActivity } from "./worker"; import { StreamViewer } from "./StreamViewer"; export default async function ActivityPage() { const stream = await streamActivity(); return <StreamViewer data={stream} />; }
"use client"; import { useState, useEffect } from "react"; export function StreamViewer({ data }) { const [entries, setEntries] = useState([]); useEffect(() => { const reader = data.getReader(); const decoder = new TextDecoder(); async function read() { while (true) { const { done, value } = await reader.read(); if (done) break; const text = typeof value === "string" ? value : decoder.decode(value); const lines = text.trim().split("\n").filter(Boolean); for (const line of lines) { setEntries((prev) => [...prev, JSON.parse(line)]); } } } read(); }, [data]); return ( <ul> {entries.map((e, i) => ( <li key={i}>[{e.phase}] {e.msg}</li> ))} </ul> ); }

サーバー側では、ワーカー関数内で @lazarv/react-serveruseSignal() を使用することで、現在のリクエストの AbortSignal を取得できます。これにより、クライアントが切断された場合やリクエストが中止された場合に、長時間実行中の操作をキャンセルすることが可能になります。

"use worker"; import { useSignal } from "@lazarv/react-server"; export async function streamActivity() { const signal = useSignal(); return new ReadableStream({ async start(controller) { for (let i = 0; i < 100; i++) { if (signal?.aborted) break; controller.enqueue(`Step ${i}\n`); await new Promise((r) => setTimeout(r, 100)); } controller.close(); }, }); }

リクエストが中止された場合(例:クライアントがページを離れた場合)、signal.abortedtrue になり、ワーカーはデータの生成を停止します。

注記: useSignal() はサーバーワーカーでのみ利用可能です。クライアントサイドのWeb Workersではサポートされていません。

同じ"use worker"ディレクティブはクライアントサイドコードでも機能します。"use client"コンポーネントが"use worker"モジュールからインポートすると、ランタイムは自動的にブラウザ内にWeb Workerを作成します。関数の引数と戻り値はRSC Flightプロトコルを用いてシリアライズされ、メインスレッドとWeb Worker間で転送されます。

これにより重い計算処理がバックグラウンドで実行されている間も、ブラウザのメインスレッドは応答性を維持します。UIのジャークや操作のフリーズが発生しません。

基本的な使用方法

ワーカーモジュールを作成します("use client" は不要で "use worker" のみが必要です)。

"use worker"; export async function fibonacci(n) { const start = performance.now(); let a = 0n, b = 1n; for (let i = 0; i < n; i++) { [a, b] = [b, a + b]; } return { n, digits: a.toString().length, duration: (performance.now() - start).toFixed(2), }; } export async function sortBenchmark(size) { const start = performance.now(); const arr = Float64Array.from({ length: size }, () => Math.random()); arr.sort(); return { size: size.toLocaleString(), duration: (performance.now() - start).toFixed(2), median: arr[Math.floor(arr.length / 2)].toFixed(8), }; }

クライアントコンポーネントから使用します。

"use client"; import { useState, useCallback } from "react"; import { fibonacci, sortBenchmark } from "./WebWorker.jsx"; export function ComputePanel() { const [result, setResult] = useState(null); const [loading, setLoading] = useState(false); const runFibonacci = useCallback(async () => { setLoading(true); const res = await fibonacci(1000); setResult(res); setLoading(false); }, []); return ( <div> <button onClick={runFibonacci} disabled={loading}> {loading ? "Computing..." : "Compute Fibonacci(1000)"} </button> {result && ( <p>{result.digits} digits, computed in {result.duration}ms</p> )} </div> ); }

fibonacciの呼び出しは完全にWeb Worker内で実行されます。ブラウザのUIは、重いBigInt計算中も応答性を維持します。

遅延Promiseの返却

クライアントサイドワーカー関数は、Promise値を含むオブジェクトを返すことができます。遅延レンダリングのために、Reactのuse()フックでこれらを利用できます。

"use worker"; export async function analyzeDataset() { return { status: "processing", data: new Promise((resolve) => { setTimeout(() => { const values = Array.from({ length: 10000 }, () => Math.random() * 100); const mean = values.reduce((a, b) => a + b) / values.length; resolve({ samples: values.length, mean: mean.toFixed(2), }); }, 2000); }), }; }
"use client"; import { Suspense, use, useState, useCallback } from "react"; import { analyzeDataset } from "./WebWorker.jsx"; function AnalysisResult({ dataPromise }) { const data = use(dataPromise); return <pre>{JSON.stringify(data, null, 2)}</pre>; } export function AnalysisPanel() { const [result, setResult] = useState(null); const run = useCallback(async () => { const res = await analyzeDataset(); setResult(res); }, []); return ( <div> <button onClick={run}>Analyze</button> {result && ( <Suspense fallback={<p>Analyzing...</p>}> <AnalysisResult dataPromise={result.data} /> </Suspense> )} </div> ); }

Web Workersからのストリーミング

クライアントサイドワーカーもReadableStream値を返すことができます。このストリームはWebワーカーからメインスレッドへ転送(ゼロコピー)されます。

"use worker"; export async function streamComputations() { const operations = [ "Generating matrix", "Computing dot product", "Normalizing vectors", "Finalizing results", ]; return new ReadableStream({ async start(controller) { for (let i = 0; i < operations.length; i++) { const result = Array.from({ length: 50000 }, () => Math.random()) .reduce((a, b) => a + b, 0); controller.enqueue( JSON.stringify({ step: i + 1, total: operations.length, operation: operations[i], result: result.toFixed(2), }) + "\n" ); await new Promise((r) => setTimeout(r, 350)); } controller.close(); }, }); }

エッジおよびサーバーレスランタイム(Cloudflare Workers、Vercel Edge、Netlify Edge、Deno Deploy)では、node:worker_threads は利用できません。これらの環境では、ランタイムは自動的にインプロセス実行にフォールバックします。つまりワーカー関数はスレッド化やシリアライゼーションのオーバーヘッドなしに直接呼び出されます。

これは "use worker" を使用するモジュールが完全に移植性を保つことを意味します。同じコードがNode.js(実際のワーカースレッドを使用)とエッジ(直接実行)の両方で動作します。コードを変更したり条件分岐を追加したりする必要はありません。

注記: エッジランタイムでは、ワーカー関数は別スレッドで実行されません。これらはサーバーコードの他の部分と同じプロセス内で実行されます。つまり、真のワーカースレッドのような並行処理の利点は得られませんが、コードはすべてのデプロイ先で互換性を保ちます。

ランタイムはコードがワーカースレッド内で実行されているかどうかを実行時に検出できるisWorker()ヘルパー関数を提供します。これは実際のワーカー内でのみ実行すべきロジックを条件付きで実行する必要がある場合に有用です。例えばメインサーバープロセスを誤って終了させずにワーカーを終了させるためにprocess.exit()を呼び出す場合などが挙げられます。

@lazarv/react-server/worker から isWorker をインポートします。このインポートパスは、サーバーワーカー(Node.js ワーカースレッド)とクライアントワーカー(Web ワーカー)の両方で動作します:

import { isWorker } from "@lazarv/react-server/worker";

Server workerの例

一般的なユースケースとして、ワーカースレッドを安全に終了させる方法があります。エッジランタイムではワーカー関数はプロセス内で実行されるため、process.exit() を呼び出すとサーバー全体が強制終了されます。これを防ぐには isWorker() を使用してください。

"use worker"; import { isWorker } from "@lazarv/react-server/worker"; export async function terminate() { if (isWorker()) { process.exit(0); // ワーカースレッドのみを終了し、サーバーは終了しない } }

Client workerの例

クライアントサイドのWeb Workerでは、isWorker()trueを返すため、ワーカー環境を検出できます:

"use worker"; import { isWorker } from "@lazarv/react-server/worker"; export async function checkIsWorker() { return isWorker(); // Web Worker内で実行されている場合にtrue }

注記: "use worker"関数がプロセス内で実行されるEdgeランタイムでは、コードが実際に別個のワーカースレッドで実行されていないため、isWorker()falseを返します。

"use worker"を使用する際には、以下の制約事項を念頭に置いてください。

シリアライゼーション

モジュールレベルディレクティブ

非同期関数のみ

状態共有なし

サーバー固有のAPI

クライアントサイド制約

Edge/serverless制約

開発モード

ワーカーの全機能(サーバーサイド計算、ワーカー内でのReactエレメントのレンダリング、ストリーミング、フィボナッチを用いたクライアントサイドWebワーカー、ソートベンチマーク、遅延プロミス、ストリーミング)を実証する完全な動作例については、公式リポジトリ内のuse-workerの例を参照してください。