1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#[macro_use]
extern crate log;
use http::Uri;
use clap::Clap;
use tonic::transport::Server;
use futures::future::FutureExt;
use compiler::Compiler;
use services::compiler::CompilerService;
use services::metadata::MetadataService;
use dvm_net::{prelude::*, api, tonic};
use api::grpc::dvm_compiler_server::DvmCompilerServer;
use api::grpc::dvm_bytecode_metadata_server::DvmBytecodeMetadataServer;
use dvm_net::api::grpc::{
vm_script_executor_server::VmScriptExecutorServer,
vm_module_publisher_server::VmModulePublisherServer,
};
use data_source::{GrpcDataSource, ModuleCache, DsMeter};
use anyhow::Result;
use services::vm::VmService;
use dvm_cli::config::*;
use dvm_cli::{init, version};
use futures::join;
use dvm_info::config::{InfoServiceConfig, MemoryOptions};
use dvm_cli::info_service::create_info_service;
use dvm_info::memory_check::MemoryChecker;
use runtime::vm::dvm::Dvm;
#[derive(Debug, Clone, Clap)]
#[clap(name = "dvm", version = version!())]
#[clap(verbatim_doc_comment)]
struct Options {
#[clap(
name = "listen address",
default_value = "http://[::1]:50051",
verbatim_doc_comment
)]
address: Endpoint,
#[clap(flatten)]
info_service: InfoServiceConfig,
#[clap(flatten)]
memory_config: MemoryOptions,
#[clap(
name = "Data-Source URI",
env = DVM_DATA_SOURCE,
default_value = "http://[::1]:50052"
)]
ds: Uri,
#[clap(flatten)]
logging: LoggingOptions,
#[clap(flatten)]
integrations: IntegrationsOptions,
}
fn main() -> Result<()> {
remove_empty_env_vars();
let options = Options::parse();
let _guard = init(&options.logging, &options.integrations);
main_internal(options)
}
#[tokio::main]
async fn main_internal(options: Options) -> Result<()> {
let (serv_term_tx, serv_term_rx) = futures::channel::oneshot::channel();
let (ds_term_tx, ds_term_rx) = tokio::sync::oneshot::channel();
let sigterm = dvm_cli::init_sigterm_handler_fut(move || {
match ds_term_tx.send(()) {
Ok(_) => info!("shutting down DS client"),
Err(err) => error!("unable to send sig into the DS client: {:?}", err),
}
match serv_term_tx.send(()) {
Ok(_) => info!("shutting down VM server"),
Err(err) => error!("unable to send sig into the server: {:?}", err),
}
});
let (info_service, hrm) = create_info_service(options.info_service);
let ds = GrpcDataSource::new(options.ds, Some(ds_term_rx))
.expect("Unable to instantiate GrpcDataSource.");
let ds = ModuleCache::new(DsMeter::new(ds), options.memory_config.module_cache());
let mem_checker = MemoryChecker::new(options.memory_config);
let vm_service = VmService::new(Dvm::new(ds.clone(), Some(mem_checker)), hrm);
let compiler_service = CompilerService::new(Compiler::new(ds));
let metadata_service = MetadataService::default();
tokio::spawn(sigterm);
let dvm = Server::builder()
.add_service(VmScriptExecutorServer::new(vm_service.clone()))
.add_service(VmModulePublisherServer::new(vm_service.clone()))
.add_service(DvmCompilerServer::new(compiler_service.clone()))
.add_service(DvmBytecodeMetadataServer::new(metadata_service))
.serve_ext_with_shutdown(options.address, serv_term_rx.map(|_| ()))
.map(|res| {
info!("VM server is shutted down");
res
});
if let Some(info_service) = info_service {
let (_info_service, dvm) = join!(info_service, dvm);
dvm.expect("Dvm internal error");
} else {
dvm.await.expect("Dvm internal error");
}
Ok(())
}