Handle multi schema

This commit is contained in:
Adrian Woźniak 2023-06-05 16:38:10 +02:00
parent 7c9baa2a3f
commit ca88ce1268
8 changed files with 603 additions and 104 deletions

7
Cargo.lock generated
View File

@ -1093,11 +1093,14 @@ version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"indexmap",
"once_cell",
"strsim 0.10.0",
"termcolor",
"textwrap 0.16.0",
]
@ -3276,7 +3279,11 @@ name = "migration"
version = "0.1.0"
dependencies = [
"async-std",
"clap 3.2.25",
"dotenv",
"sea-orm-migration",
"tracing",
"tracing-subscriber",
]
[[package]]

View File

@ -10,6 +10,10 @@ path = "src/lib.rs"
[dependencies]
async-std = { version = "1", features = ["attributes", "tokio1"] }
clap = { version = "3.2.25", features = ['derive'] }
tracing = { version = "0.1.37" }
tracing-subscriber = { version = "0.3.17", features = ['env-filter'] }
dotenv = { version = "0.15.0" }
[dependencies.sea-orm-migration]
version = "0.11.0"

View File

@ -1,18 +1,30 @@
pub use sea_orm_migration::prelude::*;
mod m20230603_102630_schema;
mod m20230603_102634_types;
mod m20230603_120814_addresses;
mod m20230603_120814_jobs;
pub mod schema_list;
pub mod types;
pub struct Migrator;
pub struct PublicMigrator;
#[async_trait::async_trait]
impl MigratorTrait for Migrator {
impl MigratorTrait for PublicMigrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![
Box::new(m20230603_102630_schema::Migration),
Box::new(m20230603_102634_types::Migration),
Box::new(m20230603_120814_addresses::Migration),
]
}
}
pub struct JobsMigrator;
#[async_trait::async_trait]
impl MigratorTrait for JobsMigrator {
fn migrations() -> Vec<Box<dyn MigrationTrait>> {
vec![Box::new(m20230603_120814_jobs::Migration)]
}
}

View File

@ -0,0 +1,37 @@
use sea_orm::Iterable;
use sea_orm_migration::prelude::*;
use sea_orm_migration::sea_orm::{DatabaseBackend, Statement};
use crate::schema_list::PostgreSQLSchema;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
for schema in PostgreSQLSchema::iter().skip(1) {
manager
.get_connection()
.execute(Statement::from_string(
DatabaseBackend::Postgres,
format!("CREATE SCHEMA {}", schema.as_str()),
))
.await?;
}
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
for schema in PostgreSQLSchema::iter().skip(1) {
manager
.get_connection()
.execute(Statement::from_string(
DatabaseBackend::Postgres,
format!("DROP SCHEMA {}", schema.as_str()),
))
.await?;
}
Ok(())
}
}

View File

@ -1,8 +1,7 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::sea_orm::{Iterable, Statement};
use sea_orm_migration::sea_orm::Statement;
use sea_query::expr::SimpleExpr;
use crate::schema_list::PostgreSQLSchema;
use crate::sea_orm::DatabaseBackend;
#[derive(DeriveMigrationName)]
@ -11,88 +10,14 @@ pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
for schema in PostgreSQLSchema::iter().skip(1) {
manager
.get_connection()
.execute(Statement::from_string(
DatabaseBackend::Postgres,
format!("CREATE SCHEMA {}", schema.as_str()),
))
.await?;
}
for schema in PostgreSQLSchema::iter() {
let conn = manager.get_connection();
conn.execute(Statement::from_string(
DatabaseBackend::Postgres,
format!("SET search_path = '{}'", schema.as_str()),
))
.await?;
conn.execute(Statement::from_string(
DatabaseBackend::Postgres,
"CREATE EXTENSION \"uuid-ossp\"".to_string(),
))
.await?;
}
manager
.create_table(
Table::create()
// id character varying NOT NULL,
// type text NOT NULL,
// created_by character varying,
// context jsonb,
// result jsonb,
// dry_run boolean DEFAULT false NOT NULL,
// created_at timestamp with time zone DEFAULT now() NOT NULL,
// pre_processed_at timestamp with time zone,
// confirmed_at timestamp with time zone,
// processing_at timestamp with time zone,
// completed_at timestamp with time zone,
// failed_at timestamp with time zone,
// canceled_at timestamp with time zone,
// updated_at timestamp with time zone DEFAULT now() NOT NULL,
// deleted_at timestamp with time zone
.table(BatchJob::BatchJobs)
.col(
ColumnDef::new(BatchJob::Id)
.uuid()
.not_null()
.default(SimpleExpr::Custom("uuid_generate_v4()".into()))
.primary_key(),
)
.col(ColumnDef::new(BatchJob::BatchType).string().not_null())
.col(ColumnDef::new(BatchJob::CreatedBy).uuid())
.col(ColumnDef::new(BatchJob::Context).json_binary())
.col(ColumnDef::new(BatchJob::Result).json_binary())
.col(
ColumnDef::new(BatchJob::DryRun)
.boolean()
.default(false)
.not_null(),
)
.col(
ColumnDef::new(BatchJob::CreatedAt)
.timestamp()
.default(SimpleExpr::Custom("now()".into()))
.not_null(),
)
.col(ColumnDef::new(BatchJob::PreProcessedAt).timestamp())
.col(ColumnDef::new(BatchJob::ConfirmedAt).timestamp())
.col(ColumnDef::new(BatchJob::ProcessingAt).timestamp())
.col(ColumnDef::new(BatchJob::CompletedAt).timestamp())
.col(ColumnDef::new(BatchJob::FailedAt).timestamp())
.col(ColumnDef::new(BatchJob::CanceledAt).timestamp())
.col(
ColumnDef::new(BatchJob::UpdatedAt)
.timestamp()
.default(SimpleExpr::Custom("now()".into()))
.not_null(),
)
.col(ColumnDef::new(BatchJob::DeletedAt).timestamp())
.to_owned(),
)
.await?;
manager
.create_table(
Table::create()
@ -221,26 +146,6 @@ impl MigrationTrait for Migration {
}
}
#[derive(Iden)]
enum BatchJob {
BatchJobs,
Id,
BatchType,
CreatedBy,
Context,
Result,
DryRun,
CreatedAt,
PreProcessedAt,
ConfirmedAt,
ProcessingAt,
CompletedAt,
FailedAt,
CanceledAt,
UpdatedAt,
DeletedAt,
}
#[derive(Iden)]
enum AnalyticsConfig {
AnalyticsConfigs,

View File

@ -0,0 +1,107 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::sea_orm::Statement;
use sea_query::expr::SimpleExpr;
use crate::sea_orm::DatabaseBackend;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute(Statement::from_string(
DatabaseBackend::Postgres,
"CREATE EXTENSION \"uuid-ossp\"".to_string(),
))
.await?;
manager
.create_table(
Table::create()
// id character varying NOT NULL,
// type text NOT NULL,
// created_by character varying,
// context jsonb,
// result jsonb,
// dry_run boolean DEFAULT false NOT NULL,
// created_at timestamp with time zone DEFAULT now() NOT NULL,
// pre_processed_at timestamp with time zone,
// confirmed_at timestamp with time zone,
// processing_at timestamp with time zone,
// completed_at timestamp with time zone,
// failed_at timestamp with time zone,
// canceled_at timestamp with time zone,
// updated_at timestamp with time zone DEFAULT now() NOT NULL,
// deleted_at timestamp with time zone
.table(BatchJob::BatchJobs)
.col(
ColumnDef::new(BatchJob::Id)
.uuid()
.not_null()
.default(SimpleExpr::Custom("uuid_generate_v4()".into()))
.primary_key(),
)
.col(ColumnDef::new(BatchJob::BatchType).string().not_null())
.col(ColumnDef::new(BatchJob::CreatedBy).uuid())
.col(ColumnDef::new(BatchJob::Context).json_binary())
.col(ColumnDef::new(BatchJob::Result).json_binary())
.col(
ColumnDef::new(BatchJob::DryRun)
.boolean()
.default(false)
.not_null(),
)
.col(
ColumnDef::new(BatchJob::CreatedAt)
.timestamp()
.default(SimpleExpr::Custom("now()".into()))
.not_null(),
)
.col(ColumnDef::new(BatchJob::PreProcessedAt).timestamp())
.col(ColumnDef::new(BatchJob::ConfirmedAt).timestamp())
.col(ColumnDef::new(BatchJob::ProcessingAt).timestamp())
.col(ColumnDef::new(BatchJob::CompletedAt).timestamp())
.col(ColumnDef::new(BatchJob::FailedAt).timestamp())
.col(ColumnDef::new(BatchJob::CanceledAt).timestamp())
.col(
ColumnDef::new(BatchJob::UpdatedAt)
.timestamp()
.default(SimpleExpr::Custom("now()".into()))
.not_null(),
)
.col(ColumnDef::new(BatchJob::DeletedAt).timestamp())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(BatchJob::BatchJobs).to_owned())
.await?;
Ok(())
}
}
#[derive(Iden)]
enum BatchJob {
BatchJobs,
Id,
BatchType,
CreatedBy,
Context,
Result,
DryRun,
CreatedAt,
PreProcessedAt,
ConfirmedAt,
ProcessingAt,
CompletedAt,
FailedAt,
CanceledAt,
UpdatedAt,
DeletedAt,
}

View File

@ -1,6 +1,433 @@
use std::error::Error;
use std::fmt::Display;
use std::process::exit;
use clap::*;
use dotenv::dotenv;
use migration::schema_list::PostgreSQLSchema;
use migration::sea_orm::{ConnectOptions, Database, DbConn};
use sea_orm_migration::prelude::*;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
// const MIGRATION_DIR: &str = "./";
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
dotenv().ok();
let mut cli: Cli = Cli::parse();
init_logger(cli.verbose);
cli.database_schema = Some(PostgreSQLSchema::Public.as_str().into());
run_cli(&cli, migration::PublicMigrator).await;
cli.database_schema = Some(PostgreSQLSchema::Jobs.as_str().into());
run_cli(&cli, migration::JobsMigrator).await;
}
pub async fn run_cli<M>(cli: &Cli, migrator: M)
where
M: MigratorTrait,
{
let url = cli
.database_url
.as_ref()
.expect("Environment variable 'DATABASE_URL' not set");
let schema = cli
.database_schema
.as_ref()
.cloned()
.unwrap_or_else(|| "public".to_owned());
let connect_options = ConnectOptions::new(url.clone())
.set_schema_search_path(schema)
.to_owned();
let db = &Database::connect(connect_options)
.await
.expect("Fail to acquire database connection");
run_migrate(migrator, db, cli.command.clone())
.await
.unwrap_or_else(handle_error);
}
pub async fn run_migrate<M>(
_: M,
db: &DbConn,
command: Option<MigrateSubcommands>,
) -> Result<(), Box<dyn Error>>
where
M: MigratorTrait,
{
match command {
Some(MigrateSubcommands::Fresh) => M::fresh(db).await?,
Some(MigrateSubcommands::Refresh) => M::refresh(db).await?,
Some(MigrateSubcommands::Reset) => M::reset(db).await?,
Some(MigrateSubcommands::Status) => M::status(db).await?,
Some(MigrateSubcommands::Up { num }) => M::up(db, num).await?,
Some(MigrateSubcommands::Down { num }) => M::down(db, Some(num)).await?,
// Some(MigrateSubcommands::Init) => run_migrate_init(MIGRATION_DIR)?,
// Some(MigrateSubcommands::Generate {
// migration_name,
// universal_time: _,
// local_time,
// }) => run_migrate_generate(MIGRATION_DIR, &migration_name, !local_time)?,
_ => M::up(db, None).await?,
};
Ok(())
}
fn init_logger(verbose: bool) {
let filter = match verbose {
true => "debug",
false => "sea_orm_migration=info",
};
let filter_layer = EnvFilter::try_new(filter).unwrap();
if verbose {
let fmt_layer = tracing_subscriber::fmt::layer();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init()
} else {
let fmt_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_level(false)
.without_time();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init()
};
}
fn handle_error<E>(error: E)
where
E: Display,
{
eprintln!("{error}");
exit(1);
}
#[derive(Parser)]
#[clap(version)]
pub struct Cli {
#[clap(action, short = 'v', long, global = true, help = "Show debug messages")]
verbose: bool,
#[clap(
value_parser,
global = true,
short = 's',
long,
env = "DATABASE_SCHEMA",
long_help = "Database schema\n \
- For MySQL and SQLite, this argument is ignored.\n \
- For PostgreSQL, this argument is optional with default value 'public'.\n"
)]
database_schema: Option<String>,
#[clap(
value_parser,
global = true,
short = 'u',
long,
env = "DATABASE_URL",
help = "Database URL"
)]
database_url: Option<String>,
#[clap(subcommand)]
command: Option<MigrateSubcommands>,
}
#[derive(Subcommand, PartialEq, Eq, Debug)]
pub enum Commands {
#[clap(
about = "Codegen related commands",
arg_required_else_help = true,
display_order = 10
)]
Generate {
#[clap(subcommand)]
command: GenerateSubcommands,
},
#[clap(about = "Migration related commands", display_order = 20)]
Migrate {
#[clap(
value_parser,
global = true,
short = 'd',
long,
help = "Migration script directory.
If your migrations are in their own crate,
you can provide the root of that crate.
If your migrations are in a submodule of your app,
you should provide the directory of that submodule.",
default_value = "./migration"
)]
migration_dir: String,
#[clap(
value_parser,
global = true,
short = 's',
long,
env = "DATABASE_SCHEMA",
long_help = "Database schema\n \
- For MySQL and SQLite, this argument is ignored.\n \
- For PostgreSQL, this argument is optional with default value 'public'.\n"
)]
database_schema: Option<String>,
#[clap(
value_parser,
global = true,
short = 'u',
long,
env = "DATABASE_URL",
help = "Database URL"
)]
database_url: Option<String>,
#[clap(subcommand)]
command: Option<MigrateSubcommands>,
},
}
#[derive(Subcommand, PartialEq, Eq, Debug, Clone)]
pub enum MigrateSubcommands {
#[clap(about = "Initialize migration directory", display_order = 10)]
Init,
#[clap(about = "Generate a new, empty migration", display_order = 20)]
Generate {
#[clap(
value_parser,
required = true,
takes_value = true,
help = "Name of the new migration"
)]
migration_name: String,
#[clap(
action,
long,
default_value = "true",
help = "Generate migration file based on Utc time",
conflicts_with = "local-time",
display_order = 1001
)]
universal_time: bool,
#[clap(
action,
long,
help = "Generate migration file based on Local time",
conflicts_with = "universal-time",
display_order = 1002
)]
local_time: bool,
},
#[clap(
about = "Drop all tables from the database, then reapply all migrations",
display_order = 30
)]
Fresh,
#[clap(
about = "Rollback all applied migrations, then reapply all migrations",
display_order = 40
)]
Refresh,
#[clap(about = "Rollback all applied migrations", display_order = 50)]
Reset,
#[clap(about = "Check the status of all migrations", display_order = 60)]
Status,
#[clap(about = "Apply pending migrations", display_order = 70)]
Up {
#[clap(
value_parser,
short,
long,
help = "Number of pending migrations to apply"
)]
num: Option<u32>,
},
#[clap(
value_parser,
about = "Rollback applied migrations",
display_order = 80
)]
Down {
#[clap(
value_parser,
short,
long,
default_value = "1",
help = "Number of applied migrations to be rolled back",
display_order = 90
)]
num: u32,
},
}
#[derive(Subcommand, PartialEq, Eq, Debug)]
pub enum GenerateSubcommands {
#[clap(about = "Generate entity")]
#[clap(arg_required_else_help = true)]
#[clap(group(ArgGroup::new("formats").args(&["compact-format", "expanded-format"])))]
#[clap(group(ArgGroup::new("group-tables").args(&["tables", "include-hidden-tables"])))]
Entity {
#[clap(action, long, help = "Generate entity file of compact format")]
compact_format: bool,
#[clap(action, long, help = "Generate entity file of expanded format")]
expanded_format: bool,
#[clap(
action,
long,
help = "Generate entity file for hidden tables (i.e. table name starts with an underscore)"
)]
include_hidden_tables: bool,
#[clap(
value_parser,
short = 't',
long,
use_value_delimiter = true,
takes_value = true,
help = "Generate entity file for specified tables only (comma separated)"
)]
tables: Vec<String>,
#[clap(
value_parser,
long,
use_value_delimiter = true,
takes_value = true,
default_value = "seaql_migrations",
help = "Skip generating entity file for specified tables (comma separated)"
)]
ignore_tables: Vec<String>,
#[clap(
value_parser,
long,
default_value = "1",
help = "The maximum amount of connections to use when connecting to the database."
)]
max_connections: u32,
#[clap(
value_parser,
short = 'o',
long,
default_value = "./",
help = "Entity file output directory"
)]
output_dir: String,
#[clap(
value_parser,
short = 's',
long,
env = "DATABASE_SCHEMA",
default_value = "public",
long_help = "Database schema\n \
- For MySQL, this argument is ignored.\n \
- For PostgreSQL, this argument is optional with default value 'public'."
)]
database_schema: String,
#[clap(
value_parser,
short = 'u',
long,
env = "DATABASE_URL",
help = "Database URL"
)]
database_url: String,
#[clap(
value_parser,
long,
default_value = "none",
help = "Automatically derive serde Serialize / Deserialize traits for the entity (none, \
serialize, deserialize, both)"
)]
with_serde: String,
#[clap(
action,
long,
help = "Generate a serde field attribute, '#[serde(skip_deserializing)]', for the primary key fields to skip them during deserialization, this flag will be affective only when '--with-serde' is 'both' or 'deserialize'"
)]
serde_skip_deserializing_primary_key: bool,
#[clap(
action,
long,
default_value = "false",
help = "Opt-in to add skip attributes to hidden columns (i.e. when 'with-serde' enabled and column name starts with an underscore)"
)]
serde_skip_hidden_column: bool,
#[clap(
action,
long,
default_value = "false",
long_help = "Automatically derive the Copy trait on generated enums.\n\
Enums generated from a database don't have associated data by default, and as such can \
derive Copy.
"
)]
with_copy_enums: bool,
#[clap(
arg_enum,
value_parser,
long,
default_value = "chrono",
help = "The datetime crate to use for generating entities."
)]
date_time_crate: DateTimeCrate,
#[clap(
action,
long,
short = 'l',
default_value = "false",
help = "Generate index file as `lib.rs` instead of `mod.rs`."
)]
lib: bool,
#[clap(
value_parser,
long,
use_value_delimiter = true,
takes_value = true,
help = "Add extra derive macros to generated model struct (comma separated), e.g. `--model-extra-derives 'ts_rs::Ts','CustomDerive'`"
)]
model_extra_derives: Vec<String>,
#[clap(
value_parser,
long,
use_value_delimiter = true,
takes_value = true,
help = r#"Add extra attributes to generated model struct, no need for `#[]` (comma separated), e.g. `--model-extra-attributes 'serde(rename_all = "camelCase")','ts(export)'`"#
)]
model_extra_attributes: Vec<String>,
},
}
#[derive(ArgEnum, Copy, Clone, Debug, PartialEq, Eq)]
pub enum DateTimeCrate {
Chrono,
Time,
}

View File

@ -4,14 +4,14 @@ use sea_orm_migration::prelude::*;
// set_schema_search_path
#[derive(Iden, EnumIter)]
pub enum PostgreSQLSchema {
Default,
Public,
Jobs,
}
impl PostgreSQLSchema {
pub fn as_str(&self) -> &'static str {
match self {
PostgreSQLSchema::Default => "",
PostgreSQLSchema::Public => "public",
PostgreSQLSchema::Jobs => "jobs",
}
}