feat: add jsonrpc

main
imspace 2023-03-05 20:01:48 +08:00
parent 1890e3a2f4
commit 8df1224ea9
15 changed files with 674 additions and 139 deletions

View File

@ -19,10 +19,7 @@
"scope": [ "scope": [
{ {
"name": "../binaries/s3si", "name": "../binaries/s3si",
"sidecar": true, "sidecar": true
"args": [
"--daemon"
]
}, },
{ {
"name": "deno", "name": "deno",
@ -30,8 +27,7 @@
"args": [ "args": [
"run", "run",
"-A", "-A",
"../../s3si.ts", "../../src/daemon.ts"
"--daemon"
] ]
} }
], ],

View File

@ -15,7 +15,9 @@ export class IPC<T extends { type: string }> {
child: Promise<Child>; child: Promise<Child>;
constructor() { constructor() {
const command = import.meta.env.DEV ? new Command("deno", ["run", "-A", "../../s3si.ts", "--daemon"]) : Command.sidecar('../binaries/s3si', ['--daemon']); const command = import.meta.env.DEV
? new Command("deno", ["run", "-A", "../../src/daemon.ts"])
: Command.sidecar('../binaries/s3si');
command.stdout.on('data', line => { command.stdout.on('data', line => {
this.callback(JSON.parse(line)) this.callback(JSON.parse(line))
}) })

View File

@ -1,12 +1,11 @@
import { App, DEFAULT_OPTS } from "./src/app.ts"; import { App, DEFAULT_OPTS } from "./src/app.ts";
import { runDaemon } from "./src/daemon.ts";
import { showError } from "./src/utils.ts"; import { showError } from "./src/utils.ts";
import { flags } from "./deps.ts"; import { flags } from "./deps.ts";
const parseArgs = (args: string[]) => { const parseArgs = (args: string[]) => {
const parsed = flags.parse(args, { const parsed = flags.parse(args, {
string: ["profilePath", "exporter", "skipMode"], string: ["profilePath", "exporter", "skipMode"],
boolean: ["help", "noProgress", "monitor", "withSummary", "daemon"], boolean: ["help", "noProgress", "monitor", "withSummary"],
alias: { alias: {
"help": "h", "help": "h",
"profilePath": ["p", "profile-path"], "profilePath": ["p", "profile-path"],
@ -39,11 +38,6 @@ Options:
); );
Deno.exit(0); Deno.exit(0);
} }
if (opts.daemon) {
await runDaemon();
Deno.exit(0);
}
const app = new App({ const app = new App({
...DEFAULT_OPTS, ...DEFAULT_OPTS,

View File

@ -20,7 +20,7 @@ if (import.meta.main) {
"-o", "-o",
`../gui/binaries/s3si-${target}`, `../gui/binaries/s3si-${target}`,
"-A", "-A",
"../s3si.ts", "../src/daemon.ts",
], ],
cwd: __dirname, cwd: __dirname,
}); });

View File

@ -1,20 +1,131 @@
import { IPC } from "./ipc/mod.ts"; // deno-lint-ignore-file no-empty-interface
import { Command } from "./ipc/types.ts";
export async function runDaemon() { import {
const ipc = new IPC<Command>({ JSONRPCServer,
ResponseError,
RPCResult,
Service,
} from "./jsonrpc/mod.ts";
import { DenoIO } from "./jsonrpc/deno.ts";
import { loginSteps } from "./iksm.ts";
import { DEFAULT_ENV, Env } from "./env.ts";
import { Queue } from "./jsonrpc/channel.ts";
export interface S3SINetworkError extends ResponseError<100> {
}
export interface S3SIService {
loginSteps(): Promise<
RPCResult<
{
authCodeVerifier: string;
url: string;
},
S3SINetworkError
>
>;
loginSteps(step2: {
authCodeVerifier: string;
login: string;
}): Promise<
RPCResult<
{
sessionToken: string;
},
S3SINetworkError
>
>;
}
enum LoggerLevel {
Debug = "debug",
Log = "log",
Warn = "warn",
Error = "error",
}
class S3SIServiceImplement implements S3SIService, Service {
loginMap: Map<string, {
step1: (url: string) => void;
promise: Promise<string>;
}> = new Map();
loggerQueue: Queue<{ level: LoggerLevel; msg: unknown[] }> = new Queue();
env: Env = {
prompts: {
promptLogin: () => {
return Promise.reject("Not implemented");
},
prompt: () => {
return Promise.reject("Not implemented");
},
},
logger: {
debug: (...msg) =>
this.loggerQueue.push({ level: LoggerLevel.Debug, msg }),
log: (...msg) => this.loggerQueue.push({ level: LoggerLevel.Log, msg }),
warn: (...msg) => this.loggerQueue.push({ level: LoggerLevel.Warn, msg }),
error: (...msg) =>
this.loggerQueue.push({ level: LoggerLevel.Error, msg }),
},
newFetcher: DEFAULT_ENV.newFetcher,
};
loginSteps(): Promise<
RPCResult<
{
authCodeVerifier: string;
url: string;
},
S3SINetworkError
>
>;
loginSteps(step2: {
authCodeVerifier: string;
login: string;
}): Promise<
RPCResult<
{
sessionToken: string;
},
S3SINetworkError
>
>;
async loginSteps(step2?: {
authCodeVerifier: string;
login: string;
}): Promise<
RPCResult<
{
authCodeVerifier: string;
url: string;
} | {
sessionToken: string;
},
S3SINetworkError
>
> {
if (!step2) {
return {
result: await loginSteps(this.env),
};
}
return {
result: await loginSteps(this.env, step2),
};
}
// deno-lint-ignore no-explicit-any
[key: string]: any;
}
if (import.meta.main) {
const service = new S3SIServiceImplement();
const server = new JSONRPCServer({
transport: new DenoIO({
reader: Deno.stdin, reader: Deno.stdin,
writer: Deno.stdout, writer: Deno.stdout,
}),
service,
}); });
while (true) { await server.serve();
const cmd = await ipc.recv();
switch (cmd.type) {
case "hello":
await ipc.send(cmd);
break;
default:
continue;
}
}
} }

View File

@ -8,11 +8,42 @@ import {
import { APIError } from "./APIError.ts"; import { APIError } from "./APIError.ts";
import { Env, Fetcher } from "./env.ts"; import { Env, Fetcher } from "./env.ts";
export async function loginManually( export async function loginSteps(
{ newFetcher, prompts: { promptLogin } }: Env, env: Env,
): Promise<string> { ): Promise<
{
authCodeVerifier: string;
url: string;
}
>;
export async function loginSteps(
env: Env,
step2: {
authCodeVerifier: string;
login: string;
},
): Promise<
{
sessionToken: string;
}
>;
export async function loginSteps(
{ newFetcher }: Env,
step2?: {
authCodeVerifier: string;
login: string;
},
): Promise<
{
authCodeVerifier: string;
url: string;
} | {
sessionToken: string;
}
> {
const fetch = newFetcher(); const fetch = newFetcher();
if (!step2) {
const state = urlBase64Encode(random(36)); const state = urlBase64Encode(random(36));
const authCodeVerifier = urlBase64Encode(random(32)); const authCodeVerifier = urlBase64Encode(random(32));
const authCvHash = await crypto.subtle.digest( const authCvHash = await crypto.subtle.digest(
@ -51,10 +82,12 @@ export async function loginManually(
}, },
); );
const login = (await promptLogin(res.url)).trim(); return {
if (!login) { authCodeVerifier,
throw new Error("No login URL provided"); url: res.url,
} };
} else {
const { login, authCodeVerifier } = step2;
const loginURL = new URL(login); const loginURL = new URL(login);
const params = new URLSearchParams(loginURL.hash.substring(1)); const params = new URLSearchParams(loginURL.hash.substring(1));
const sessionTokenCode = params.get("session_token_code"); const sessionTokenCode = params.get("session_token_code");
@ -71,7 +104,27 @@ export async function loginManually(
throw new Error("No session token found"); throw new Error("No session token found");
} }
return sessionToken; return { sessionToken };
}
}
export async function loginManually(
env: Env,
): Promise<string> {
const { prompts: { promptLogin } } = env;
const step1 = await loginSteps(env);
const { url, authCodeVerifier } = step1;
const login = (await promptLogin(url)).trim();
if (!login) {
throw new Error("No login URL provided");
}
const step2 = await loginSteps(env, { authCodeVerifier, login });
return step2.sessionToken;
} }
export async function getGToken( export async function getGToken(

View File

@ -1,54 +0,0 @@
/// <reference no-default-lib="true" />
/// <reference lib="ESNext" />
/// <reference lib="dom" />
/// <reference lib="dom.iterable" />
/// <reference lib="dom.asynciterable" />
import type { ExtractType } from "./types.ts";
export class WorkerChannel<T extends { type: string }> {
queue: T[] = [];
waiting: ((value: T) => void)[] = [];
constructor(private worker?: Worker) {
const callback = ({ data }: { data: unknown }) => {
const waiting = this.waiting.shift();
if (waiting) {
waiting(data as T);
} else {
this.queue.push(data as T);
}
};
if (worker) {
worker.addEventListener("message", callback);
} else {
self.addEventListener("message", callback);
}
}
async recvType<K extends T["type"]>(
type: K,
): Promise<ExtractType<T, K>> {
const data = await this.recv();
if (data.type !== type) {
throw new Error(`Unexpected type: ${data.type}`);
}
return data as ExtractType<T, K>;
}
recv(): Promise<T> {
return new Promise<T>((resolve) => {
const data = this.queue.shift();
if (data) {
resolve(data);
} else {
this.waiting.push(resolve);
}
});
}
send(data: T) {
if (this.worker) {
this.worker.postMessage(data);
} else {
self.postMessage(data);
}
}
}

View File

@ -1,2 +1 @@
export { IPC } from "./stdio.ts"; export { IPC } from "./stdio.ts";
export { WorkerChannel } from "./channel.ts";

50
src/jsonrpc/channel.ts Normal file
View File

@ -0,0 +1,50 @@
export class Queue<T> {
queue: T[] = [];
waiting: ((value: T | undefined) => void)[] = [];
pop = (): Promise<T | undefined> => {
return new Promise<T | undefined>((resolve) => {
const data = this.queue.shift();
if (data) {
resolve(data);
} else {
this.waiting.push(resolve);
}
});
};
// TODO: wait until the data is queued if queue has limit
push = (data: T): Promise<void> => {
const waiting = this.waiting.shift();
if (waiting) {
waiting(data);
} else {
this.queue.push(data);
}
return Promise.resolve();
};
close = (): Promise<void> => {
for (const resolve of this.waiting) {
resolve(undefined);
}
return Promise.resolve();
};
}
export function channel<T>() {
const q1 = new Queue<T>();
const q2 = new Queue<T>();
const close = async () => {
await q1.close();
await q2.close();
};
return [{
send: q1.push,
recv: q2.pop,
close,
}, {
send: q2.push,
recv: q1.pop,
close,
}] as const;
}

128
src/jsonrpc/client.ts Normal file
View File

@ -0,0 +1,128 @@
// deno-lint-ignore-file no-explicit-any
import {
ID,
Request,
Response,
ResponseError,
RPCResult,
Service,
Transport,
} from "./types.ts";
export class JSONRPCClient<S extends Service> {
protected nextId = 1;
protected transport: Transport;
protected requestMap: Map<
ID,
(result: RPCResult<any, ResponseError>) => void
> = new Map();
protected fatal: unknown = undefined;
protected task: Promise<void>;
constructor(
{ transport }: { transport: Transport },
) {
this.transport = transport;
this.task = this.run();
}
protected setFatal(e: unknown) {
if (!this.fatal) {
this.fatal = e;
}
}
protected handleResponse(
resp: Response<unknown, ResponseError>,
) {
const { id } = resp;
const callback = this.requestMap.get(id);
if (callback) {
this.requestMap.delete(id);
callback(resp);
} else {
this.setFatal(new Error("invalid response id: " + String(id)));
}
}
// receive response from server
protected async run() {
try {
while (true) {
const data = await this.transport.recv();
if (data === undefined) {
this.setFatal(new Error("transport closed"));
break;
}
const result = JSON.parse(data);
if (Array.isArray(result)) {
for (const resp of result) {
this.handleResponse(resp);
}
} else {
this.handleResponse(result);
}
}
} catch (e) {
this.setFatal(e);
}
}
makeRequest<
K extends keyof S & string,
P extends Parameters<S[K]>,
>(
method: K,
params: P,
): Request<K, P> {
const req = {
jsonrpc: "2.0",
id: this.nextId,
method,
params,
} as const;
this.nextId += 1;
return req;
}
async call<
K extends keyof S & string,
P extends Parameters<S[K]>,
R extends ReturnType<S[K]>,
>(
method: K,
...params: P
): Promise<R> {
if (this.fatal) {
throw this.fatal;
}
const req = this.makeRequest(method, params);
await this.transport.send(JSON.stringify(req));
return new Promise<R>((res, rej) => {
this.requestMap.set(req.id, (result) => {
if (result.error) {
rej(result.error);
} else {
res(result.result);
}
});
});
}
getProxy(): S {
const proxy = new Proxy({}, {
get: (_, method: string) => {
return (...params: unknown[]) => {
return this.call(method, ...params as any);
};
},
});
return proxy as S;
}
async close() {
await this.transport.close();
await this.task;
}
}

32
src/jsonrpc/deno.ts Normal file
View File

@ -0,0 +1,32 @@
import { io, writeAll } from "../../deps.ts";
import { Transport } from "./types.ts";
export class DenoIO implements Transport {
lines: AsyncIterableIterator<string>;
writer: Deno.Writer & Deno.Closer;
constructor({ reader, writer }: {
reader: Deno.Reader;
writer: Deno.Writer & Deno.Closer;
}) {
this.lines = io.readLines(reader);
this.writer = writer;
}
async recv(): Promise<string | undefined> {
const result = await this.lines.next();
if (!result.done) {
return JSON.parse(result.value);
}
return undefined;
}
async send(data: string) {
await writeAll(
this.writer,
new TextEncoder().encode(data + "\n"),
);
}
async close() {
await this.writer.close();
}
}

View File

@ -0,0 +1,43 @@
import { channel } from "./channel.ts";
import { JSONRPCClient } from "./client.ts";
import { JSONRPCServer } from "./server.ts";
import { RPCResult, Service } from "./types.ts";
import { assertEquals } from "../../dev_deps.ts";
export interface SimpleService {
add(a: number, b: number): Promise<
RPCResult<number>
>;
// deno-lint-ignore no-explicit-any
[key: string]: any;
}
class SimpleServiceImplement implements SimpleService, Service {
async add(a: number, b: number): Promise<RPCResult<number>> {
return {
result: a + b,
};
}
// deno-lint-ignore no-explicit-any
[key: string]: any;
}
Deno.test("jsonrpc", async () => {
const [c1, c2] = channel<string>();
const service = new SimpleServiceImplement();
const server = new JSONRPCServer({
transport: c1,
service,
});
const serverTask = server.serve().catch((e) => console.error(e));
const client = new JSONRPCClient<SimpleService>({
transport: c2,
});
const p = client.getProxy();
assertEquals((await p.add(1, 2)).result, 3);
await client.close();
await server.close();
await serverTask;
});

2
src/jsonrpc/mod.ts Normal file
View File

@ -0,0 +1,2 @@
export * from "./types.ts";
export * from "./server.ts";

113
src/jsonrpc/server.ts Normal file
View File

@ -0,0 +1,113 @@
// deno-lint-ignore-file no-explicit-any
import {
ERROR_INVALID_REQUEST,
ERROR_METHOD_NOT_FOUND,
ERROR_PARSEE_ERROR,
ID,
Request,
Response,
ResponseError,
Service,
Transport,
} from "./types.ts";
export class JSONRPCServer {
protected transport: Transport;
protected service: Service;
protected fatal = false;
protected task: Promise<void> = Promise.resolve();
constructor(
{ transport, service }: { transport: Transport; service: Service },
) {
this.transport = transport;
this.service = service;
}
async handleRequest(
req: Request<string, any>,
): Promise<Response<any, ResponseError>> {
const { jsonrpc, id, method, params } = req;
const res = {
jsonrpc: "2.0",
id,
} as const;
if (jsonrpc !== "2.0") {
this.fatal = true;
return {
...res,
error: ERROR_INVALID_REQUEST,
};
}
const func = this.service[method];
if (!func) {
return {
...res,
error: ERROR_METHOD_NOT_FOUND,
};
}
const result = await func(...params);
return {
...res,
result,
};
}
// `handle` will never throw error
async handle(
data: string,
): Promise<Response<any, ResponseError> | Response<any, ResponseError>[]> {
let req: Request<string, any>;
try {
req = JSON.parse(data);
} catch (_) {
this.fatal = true;
return {
jsonrpc: "2.0",
id: null,
error: ERROR_PARSEE_ERROR,
};
}
const internalError: (id: ID) => (
e: unknown,
) => Response<any, ResponseError<32000, unknown>> = (id) =>
(
e,
) => ({
jsonrpc: "2.0",
id: id,
error: {
code: 32000,
message: "Internal error",
data: e,
},
});
// batch request
if (Array.isArray(req)) {
return await Promise.all(
req.map((req) => this.handleRequest(req).catch(internalError(req.id))),
);
} else {
return await this.handleRequest(req).catch(internalError(req.id));
}
}
async serve() {
while (!this.fatal) {
const data = await this.transport.recv();
if (data === undefined) {
break;
}
this.handle(data).then((result) =>
this.transport.send(JSON.stringify(result))
).catch((e) => {
console.error("Failed to handle request", e);
});
}
}
async close() {
await this.transport.close();
}
}

66
src/jsonrpc/types.ts Normal file
View File

@ -0,0 +1,66 @@
export type ID = string | number | null;
// deno-lint-ignore no-explicit-any
export type ResponseError<Code extends number = number, Data = any> = {
code: Code;
message: string;
data?: Data;
};
export type Request<Method extends string, Params> = {
jsonrpc: "2.0";
method: Method;
params: Params;
id: ID;
};
export type Notification<Method extends string, Params> = {
jsonrpc: "2.0";
method: Method;
params: Params;
};
// deno-lint-ignore no-explicit-any
export type Response<Result, Error extends ResponseError<number, any>> = {
jsonrpc: "2.0";
id: ID;
} & RPCResult<Result, Error>;
export type Transport = {
send: (data: string) => Promise<void>;
recv: () => Promise<string | undefined>;
close: () => Promise<void>;
};
export type RPCResult<Result, Error extends ResponseError = ResponseError> = {
result?: Result;
error?: Error;
};
export type Service = {
[P in string]: (
// deno-lint-ignore no-explicit-any
...args: any[]
) => Promise<RPCResult<unknown, ResponseError>>;
};
export const ERROR_PARSEE_ERROR: ResponseError<-32700> = {
code: -32700,
message: "Parse error",
};
export const ERROR_INVALID_REQUEST: ResponseError<-32600> = {
code: -32600,
message: "Invalid Request",
};
export const ERROR_METHOD_NOT_FOUND: ResponseError<-32601> = {
code: -32601,
message: "Method not found",
};
export const ERROR_INVALID_PARAMS: ResponseError<-32602> = {
code: -32602,
message: "Invalid params",
};
export const ERROR_INTERNAL_ERROR: ResponseError<-32603> = {
code: -32603,
message: "Internal error",
};