Watcher and initial load performance improvements (#357)

* Set low CPU priority on watcher

Fixes #247.

* Batch stat and readdir calls

* Fix fs.exists

callbackify seems to always adds an error as the first argument. Opted
to just use the promise for this one.

* Batch lstat

* Add maximum time for flushing batches
This commit is contained in:
Asher 2019-03-27 17:04:19 -05:00 committed by Kyle Carberry
parent 38a0706b18
commit 259095eae2
4 changed files with 136 additions and 8 deletions

View file

@ -1,12 +1,42 @@
import * as fs from "fs";
import { callbackify } from "util";
import { ClientProxy } from "../../common/proxy";
import { ClientProxy, Batch } from "../../common/proxy";
import { IEncodingOptions, IEncodingOptionsCallback } from "../../common/util";
import { FsModuleProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "../../node/modules/fs";
import { Writable } from "./stream";
// tslint:disable no-any
class StatBatch extends Batch<IStats, { path: fs.PathLike }> {
public constructor(private readonly proxy: FsModuleProxy) {
super();
}
protected remoteCall(batch: { path: fs.PathLike }[]): Promise<(IStats | Error)[]> {
return this.proxy.statBatch(batch);
}
}
class LstatBatch extends Batch<IStats, { path: fs.PathLike }> {
public constructor(private readonly proxy: FsModuleProxy) {
super();
}
protected remoteCall(batch: { path: fs.PathLike }[]): Promise<(IStats | Error)[]> {
return this.proxy.lstatBatch(batch);
}
}
class ReaddirBatch extends Batch<Buffer[] | fs.Dirent[] | string[], { path: fs.PathLike, options: IEncodingOptions }> {
public constructor(private readonly proxy: FsModuleProxy) {
super();
}
protected remoteCall(queue: { path: fs.PathLike, options: IEncodingOptions }[]): Promise<(Buffer[] | fs.Dirent[] | string[] | Error)[]> {
return this.proxy.readdirBatch(queue);
}
}
class Watcher extends ClientProxy<WatcherProxy> implements fs.FSWatcher {
public close(): void {
this.proxy.close();
@ -28,7 +58,15 @@ class WriteStream extends Writable<WriteStreamProxy> implements fs.WriteStream {
}
export class FsModule {
public constructor(private readonly proxy: FsModuleProxy) {}
private readonly statBatch: StatBatch;
private readonly lstatBatch: LstatBatch;
private readonly readdirBatch: ReaddirBatch;
public constructor(private readonly proxy: FsModuleProxy) {
this.statBatch = new StatBatch(this.proxy);
this.lstatBatch = new LstatBatch(this.proxy);
this.readdirBatch = new ReaddirBatch(this.proxy);
}
public access = (path: fs.PathLike, mode: number | undefined | ((err: NodeJS.ErrnoException) => void), callback?: (err: NodeJS.ErrnoException) => void): void => {
if (typeof mode === "function") {
@ -72,9 +110,7 @@ export class FsModule {
}
public exists = (path: fs.PathLike, callback: (exists: boolean) => void): void => {
callbackify(this.proxy.exists)(path, (exists) => {
callback!(exists as any);
});
this.proxy.exists(path).then((exists) => callback(exists)).catch(() => callback(false));
}
public fchmod = (fd: number, mode: string | number, callback: (err: NodeJS.ErrnoException) => void): void => {
@ -124,7 +160,7 @@ export class FsModule {
}
public lstat = (path: fs.PathLike, callback: (err: NodeJS.ErrnoException, stats: fs.Stats) => void): void => {
callbackify(this.proxy.lstat)(path, (error, stats) => {
callbackify(this.lstatBatch.add)({ path }, (error, stats) => {
callback(error, stats && new Stats(stats));
});
}
@ -175,7 +211,7 @@ export class FsModule {
callback = options;
options = undefined;
}
callbackify(this.proxy.readdir)(path, options, callback!);
callbackify(this.readdirBatch.add)({ path, options }, callback!);
}
public readlink = (path: fs.PathLike, options: IEncodingOptionsCallback, callback?: (err: NodeJS.ErrnoException, linkString: string | Buffer) => void): void => {
@ -203,7 +239,7 @@ export class FsModule {
}
public stat = (path: fs.PathLike, callback: (err: NodeJS.ErrnoException, stats: fs.Stats) => void): void => {
callbackify(this.proxy.stat)(path, (error, stats) => {
callbackify(this.statBatch.add)({ path }, (error, stats) => {
callback(error, stats && new Stats(stats));
});
}

View file

@ -81,3 +81,74 @@ export enum Module {
NodePty = "node-pty",
Trash = "trash",
}
interface BatchItem<T, A> {
args: A;
resolve: (t: T) => void;
reject: (e: Error) => void;
}
/**
* Batch remote calls.
*/
export abstract class Batch<T, A> {
private idleTimeout: number | NodeJS.Timer | undefined;
private maxTimeout: number | NodeJS.Timer | undefined;
private batch = <BatchItem<T, A>[]>[];
public constructor(
/**
* Flush after reaching this amount of time.
*/
private readonly maxTime = 1000,
/**
* Flush after reaching this count.
*/
private readonly maxCount = 100,
/**
* Flush after not receiving more requests for this amount of time.
*/
private readonly idleTime = 100,
) {}
public add = (args: A): Promise<T> => {
return new Promise((resolve, reject) => {
this.batch.push({
args,
resolve,
reject,
});
if (this.batch.length >= this.maxCount) {
this.flush();
} else {
clearTimeout(this.idleTimeout as any);
this.idleTimeout = setTimeout(this.flush, this.idleTime);
if (typeof this.maxTimeout === "undefined") {
this.maxTimeout = setTimeout(this.flush, this.maxTime);
}
}
});
}
protected abstract remoteCall(batch: A[]): Promise<(T | Error)[]>;
private flush = (): void => {
clearTimeout(this.idleTimeout as any);
clearTimeout(this.maxTimeout as any);
this.maxTimeout = undefined;
const batch = this.batch;
this.batch = [];
this.remoteCall(batch.map((q) => q.args)).then((results) => {
batch.forEach((item, i) => {
const result = results[i];
if (result && result instanceof Error) {
item.reject(result);
} else {
item.resolve(result);
}
});
}).catch((error) => batch.forEach((item) => item.reject(error)));
}
}

View file

@ -156,6 +156,10 @@ export class FsModuleProxy {
return this.makeStatsSerializable(await promisify(fs.lstat)(path));
}
public async lstatBatch(args: { path: fs.PathLike }[]): Promise<(Stats | Error)[]> {
return Promise.all(args.map((a) => this.lstat(a.path).catch((e) => e)));
}
public mkdir(path: fs.PathLike, mode: number | string | fs.MakeDirectoryOptions | undefined | null): Promise<void> {
return promisify(fs.mkdir)(path, mode);
}
@ -182,6 +186,10 @@ export class FsModuleProxy {
return promisify(fs.readdir)(path, options);
}
public readdirBatch(args: { path: fs.PathLike, options: IEncodingOptions }[]): Promise<(Buffer[] | fs.Dirent[] | string[] | Error)[]> {
return Promise.all(args.map((a) => this.readdir(a.path, a.options).catch((e) => e)));
}
public readlink(path: fs.PathLike, options: IEncodingOptions): Promise<string | Buffer> {
return promisify(fs.readlink)(path, options);
}
@ -202,6 +210,10 @@ export class FsModuleProxy {
return this.makeStatsSerializable(await promisify(fs.stat)(path));
}
public async statBatch(args: { path: fs.PathLike }[]): Promise<(Stats | Error)[]> {
return Promise.all(args.map((a) => this.stat(a.path).catch((e) => e)));
}
public symlink(target: fs.PathLike, path: fs.PathLike, type?: fs.symlink.Type | null): Promise<void> {
return promisify(fs.symlink)(target, path, type);
}

View file

@ -1,7 +1,9 @@
import * as cp from "child_process";
import * as fs from "fs";
import * as os from "os";
import * as path from "path";
import * as vm from "vm";
import { logger } from "@coder/logger";
import { buildDir, isCli } from "../constants";
let ipcMsgBuffer: Buffer[] | undefined = [];
@ -151,6 +153,13 @@ export const forkModule = (modulePath: string, args?: string[], options?: cp.For
} else {
proc = cp.spawn(process.execPath, ["--require", "ts-node/register", "--require", "tsconfig-paths/register", process.argv[1], ...forkArgs], forkOptions);
}
if (args && args[0] === "--type=watcherService" && os.platform() === "linux") {
cp.exec(`renice -n 19 -p ${proc.pid}`, (error) => {
if (error) {
logger.warn(error.message);
}
});
}
return proc;
};