Skip to content

dmjohnsson23/micdrop-ruby

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Micdrop

Extensible framework/library to migrate data from source to another using a more declarative interface. It is primarily intended for use in:

  • Import scripts
  • Export scripts
  • Cross-application data migrations

At its core, the library's operation is quite simple: loop over the rows of the source data, perform some transformations, and output the transformed data to the sink.

# Many types can be used as sources and sinks, including CSV files and Sequel Datasets
source = CSV.read("data_source.csv", headers:true)
sink = DB[:destination_table]

# Migrations define how data from the source is transformed to go into the sink
Micdrop.migrate source, sink do
    take "Name", put: :name
    take "Birth Date" do
        parse_date "%m/%d/%y"
        format_date "%Y-%d-%m"
        put :dob
    end
    take "Deceased?" do
        parse_boolean
        default false
        put :is_deceased
    end
end

Note:

This is a re-implementation in Ruby of my previous attempt at this concept in Python. Ruby provides a far superior syntax for this concept than Python. This version does not yet have the full feature set of the Python version, but is still quite usable.

Terminology

  • Source: A source of data at the beginning of a pipeline; a sequence of multiple Records.
  • Sink: The final destination in which Records are to be stored after their transformations.
  • Record: A single record in a Source. (For example, a database row.)
  • Item: An sub-component or a record. (Such as a database column. Items may also be Records themselves if the parent record represents structured data such as JSON.)
  • Take: Extract a single Item from a Record or Collector
  • Put: Deposit a single item into a Collector
  • Collector: Similar to a Record, but intended to be filled by the migration rather than coming from the source. (A single Collector exists by default, which will be pushed to the Sink. However, you can also use manually-created Collectors as Items to build up hierarchical structures.)

For any curious, the library name itself is partially an abbreviation of the words, "Migrate, Import, and Convert".

Usage

Before we can begin a migration, we need a source and a sink.

Essentially, any Ruby object which meets the following criteria can be used as a source:

  • The object responds to :each, :each_with_index, and/or :each_pair (so, any Enumerator works)
  • The items yielded by :each and friends respond to :[]

A sink is similar, but has a single criteria: it must respond to :<<.

By default, the :<< method of the sink will receive a hash. However, if another object is needed, the sink may optionally implement :make_collector to return another object instead. The collector must respond to :[]= but otherwise may be any object you wish.

Simple Migrations

Let's begin with the simplest possible migration:

# Many objects can be used as sources. While dedicated source classes exist for more complex
# use-cases, even a simple array of hashes can be used as a source.
source = [
    {a:1, b:2},
    {a:3, b:4},
    {a:5, b:6},
]

# Likewise, a simple array can also be used as a sink, though special classes exist for more 
# complex use-cases.
sink = []

Micdrop.migrate source, sink do # This block is executed for every record in the source
    # If no conversion is needed, you can simply Take items and Put them in the appropriate place
    take :a, put: "A"
    take :b, put: "B"
end

# `sink` now looks like this:
[
    {"A"=>1, "B"=>2},
    {"A"=>3, "B"=>4},
    {"A"=>5, "B"=>6},
]

Or course, data rarely maps so cleanly in the real world; conversion is usually needed. Adding a block to the Take allows you to specify transforms:

source = [
    {a:"Yes", b:"08/07/22", c:"Stuff"},
    {a:"Yes", b:"24/04/24", c:"Things"},
    {a:"No", b:"11/12/21", c:nil},
]
sink = []

Micdrop.migrate source, sink do
    take :a, put: "A" do
        parse_boolean
    end
    take :b, put: "B" do
        # We"ll parse the date from a string, and then format it in the new format
        parse_date "%m/%d/%y"
        format_date "%Y-%d-%m"
    end
    take :c  do
        default "Whatsit"
        # The Put can optionally be specified in the block body rather than as a method parameter
        put "C"
    end
    # Method chaining is also allowed. The previous block could alternatively have bee written as:
    take(:c).default("Whatsit").put("C")
end

# `sink` now looks like this:
[
    {"A"=>true, "B"=>"2022-07-08", "C"=>"Stuff"},
    {"A"=>true, "B"=>"2024-04-24", "C"=>"Things"},
    {"A"=>false, "B"=>"2021-12-11", "C"=>"Whatsit"},
]

Each block acts as a pipeline, with each transform being executed sequentially and modifying the value in-place. Your pipelines can be arbitrarily complex, and even include multiple Puts at different stages of the pipeline.

Destructuring Data

If your source data is more structured, you can use scope and extract to navigate the tree:

source = [
    {some:{deeply:{nested:{data: "1", stuff: "2"}}, other: "3"}},
    {some:{deeply:{nested:{data: "4", stuff: "5"}}, other: "6"}},
    {some:{deeply:{nested:{data: "7", stuff: "8"}}, other: "9"}},
]
sink = []

Micdrop.migrate source, sink do
    take :some do
        scope do
            # The `scope` method prevents operations in this block from affecting the value in 
            # the outer `take` block. The `extract` method traverses down the tree.
            extract :other
            put :some_other
        end
        scope do
            # `extract` can be used multiple times to go down multiple levels
            extract :deeply
            extract :nested
            # And scopes can be nested
            scope do
                extract :data
                put :some_deeply_nested_data
            end
            scope do
                extract :stuff
                put :some_deeply_nested_stuff
            end
        end
    end
    # If you only need a single item in a deeply nested structure, you can chain all the  methods 
    # directly on the `take` as well
    take(:some).extract(:deeply).extract(:nested).extract(:stuff).put(:some_deeply_nested_stuff)
    # Or even use `take_dig`
    take_dig :some, :deeply, :nested, :stuff, put: :some_deeply_nested_stuff
end

Outputting Structured Data

By default, Micdrop assumes your output data follows a normal row/column structure, rather than containing complex strucutures. Micdrop has some limited suport for building up structure, though more complex tools are in the works for the future.

The collect_list method is currently the primary supported way of building up structure. It takes multiple takes and allows them to be operated on in a single pipeline:

source = [
    {person: 1, home_phone: nil, work_phone: "(354) 756-4796", cell_phone: "(234) 678-7564"},
    {person: 2, home_phone: "(867) 123-9748", work_phone: nil, cell_phone: "(475) 364-8365"},
]
sink = []

Micdrop.migrate source, sink do
    take :person, put: :person_id
    collect_list(take(:home_phone), take(:work_phone), take(:cell_phone)) do
        # Here, the value is a list containing the values of all three `take`s
        # We can remove the nil values from the list
        compact
        # Then join the remaining as a JSON-formatted list
        format_json
        put :phones
    end
end

# `sink` now looks like this:
[
    {person_id: 1, phones: '["(354) 756-4796", "(234) 678-7564"]'},
    {person_id: 2, phones: '["(867) 123-9748", "(475) 364-8365"]'},
]

There are several other methods that are useful for operating on collected lists as well, such as filter, map, coalesce, and map_apply.

In addition to collect_list, there is also collect_kv which takes a hash of takes as the first argument:

Micdrop.migrate source, sink do
    take :person, put: :person_id
    collect_kv({"Home"=>take(:home_phone), "Work"=>take(:work_phone), "Cell"=>take(:cell_phone)}) do
        # Here, the value is a hash containing the values of all three `take`s
    end
end

And also collect_format_string, which collects multiple items into a format string:

Micdrop.migrate source, sink do
    take :person, put: :person_id
    collect_format_string("Home: %s, Work: %s, Cell: %s", take(:home_phone), take(:work_phone), take(:cell_phone)) do
        # Here, the value is a string with the `take`n values inserted
    end
end

Finally, you can also create nested structures on the fly using the put_bury method in place of the normal put.

Micdrop.migrate source, sink do
    take :person, put: :person_id
    take :home_phone do
        put_bury :phones, :home unless value.nil?
    end
    take :work_phone do
        put_bury :phones, :work unless value.nil?
    end
    take :cell_phone do
        put_bury :phones, :cell unless value.nil?
    end
end

# `sink` now looks like this:

[
    { person_id: 1, phones: {work: "(354) 756-4796", cell: "(234) 678-7564"} },
    { person_id: 2, phones: {home: "(867) 123-9748", cell: "(475) 364-8365"} },
]

put_bury operates similarly to Ruby's standard dig method, but in reverse.

Creating Multiple Output Records

For instances where a single source record maps to multiple sink records, there are techniques for outputting multiple records. The first is simply to use flush.

source = [
    {person: 1, home_phone: "(634) 654-2457", work_phone: "(354) 756-4796", cell_phone: "(234) 678-7564"},
    {person: 2, home_phone: "(867) 123-9748", work_phone: "(234) 534-2667", cell_phone: "(475) 364-8365"},
]
sink = []

Micdrop.migrate source, sink do
    take :person, put: :person_id
    take :home_phone, put: :number
    static "Home", put: :type
    flush # This creates the first record and resets the collector
    # Now we start the second record
    take :person, put: :person_id
    take :work_phone, put: :number
    static "Work", put: :type
    flush 
    # And the third record
    take :person, put: :person_id
    take :cell_phone, put: :number
    static "Cell", put: :type
    # There is an implicit flush at the end of the block, so we don't need an explicit one (though it won't hurt anything)
end

# `sink` now looks like this:
[
    {person_id: 1, number: "(634) 654-2457", type: "Home"},
    {person_id: 1, number: "(354) 756-4796", type: "Work"},
    {person_id: 1, number: "(234) 678-7564", type: "Cell"},
    {person_id: 2, number: "(867) 123-9748", type: "Home"},
    {person_id: 2, number: "(234) 534-2667", type: "Work"},
    {person_id: 2, number: "(475) 364-8365", type: "Cell"},
]

flush takes an optional reset parameter that is true by default. If set to false, the output will still be generated, but the collector will not be reset.

In cases where iteration is desired, each_subrecord provides a convenient interface:

source = [
    {person: 1, addresses: [{line1: "123 Example St.", city: "Anytown", state: "AL", zip: "12345", role: "Mailing"}]},
    {person: 2, addresses: [{line1: "123 Any Way", city: "Thereabouts", state: "AK", zip: "98765", role: "Home"}, {line1: "PO Box 123", city: "Thereabouts", state: "AK", zip: "98765", role: "Mailing"}]},
]
sink = []

Micdrop.migrate source, sink do
    # Save this so we can `put` it separately in each record
    person_id = take :person
    # Iterate each address, and automatically flush and reset after each
    take(:addresses).each_subrecord flush: true, reset: true do
        person_id.put :person_id
        take :line1, put: :line1
        take :city, put: :city
        take :state, put: :state
        take :zip, put: :zip
        take :role, put: :role
    end
end

# `sink` now looks like this:
[
    {person_id: 1, line1: "123 Example St.", city: "Anytown", state: "AL", zip: "12345", role: "Mailing"},
    {person_id: 2, line1: "123 Any Way", city: "Thereabouts", state: "AK", zip: "98765", role: "Home"},
    {person_id: 2, line1: "PO Box 123", city: "Thereabouts", state: "AK", zip: "98765", role: "Mailing"},
]

There may also be cases where multiple sinks are needed, rather than merely multiple records in the same sink. For this use case, it is recommended to simply iterate the same source multiple times, once to each sink.

source = [
    {id: 1, first_name: "Alice", last_name: "Anderson", mail_line1: "123 Example St.", mail_city: "Anytown", mail_state: "AL", mail_zip: "12345"},
    {id: 2, first_name: "Bob", last_name: "Benson", mail_line1: "PO Box 123", mail_city: "Thereabouts", mail_state: "AK", mail_zip: "98765"},
]
person_sink = []
address_sink = []

Micdrop.migrate source, person_sink do
    take :id, put: :id
    take :first_name, put: :fname
    take :last_name, put: :lname
end

Micdrop.migrate source, address_sink do
    take :id, put: :person_id
    take :mail_line1, put: :line1
    take :mail_city, put: :city
    take :mail_state, put: :state
    take :mail_zip, put: :zip
    static "Mailing", put: :role
end

Filling the Gaps

If you find yourself writing the same block multiple times, you can instead write it as a proc and apply that to the Takes.

source = [
    {a:1, b:2},
    {a:nil, b:4},
    {a:5, b:nil},
]
sink = []

default_0 = proc do
    # This reusable pipeline can be as complex as needed
    default 0
end

Micdrop.migrate source, sink do
    # Both of the following syntaxes are equivilent
    take :a, apply: default_0, put: "A"
    take :b do
        apply default_0
        put "B"
    end
end

# `sink` now looks like this:
[
    {a:1, b:2},
    {a:0, b:4},
    {a:5, b:0},
]

Of course, you may need to modify data in ways that are not supported by existing transforms. But, you can just use plain old Ruby to fill the gaps. There are a few ways to do this:

source = [
    {a:1, b:2, c:3},
    {a:4, b:5, c:6},
    {a:7, b:8, c:9},
]
sink = []

Micdrop.migrate source, sink do
    # You can pass a proc (or symbol) to the `convert` parameter
    take :a, convert: proc {it + 1}, put: "A"
    # Or you can use a `convert` block
    take :b do
        convert {it * 2}
        put "B"
    end
    # Or you can use the `update` and `value` methods directly in the main item block
    take :c do
        if value % 2
            update "Odd"
        else
            update "Even"
        end
        put "C"
    end
end

# `sink` now looks like this:
[
    {"A"=>2, "B"=>4, "C"=>"Odd"},
    {"A"=>5, "B"=>10, "C"=>"Even"},
    {"A"=>8, "B"=>16, "C"=>"Odd"},
]

And transforms are nothing more than standard Ruby methods; there is no magic going on under the hood (other than the normal Ruby magic). So, if you find yourself needing the same pure-Ruby code often, you can just extend ItemContext with an additional method, which can then be used as a transform.

module Micdrop
    class ItemContext
        def subtract(v)
            # Do whatever you like here; just make sure to save the result to @value
            @value = @value - v
            # Also return `self` to enable method chaining
            self
        end
    end
end

Micdrop.migrate source, sink do
    take :a do
        subtract 1
        put "A"
    end

If needed, you can also use the before_flush or after_flush hooks to add actions before or after the flush. Both take the same form:

Micdrop.migrate source, sink do
    after_flush do |record, collected|
        # `record` is the RootRecordContext, `collected` is the hash of `put` values.
        # For example, you could do something like this if the sink was a Sequel InsertSource
        puts "Inserted ID #{record.sink.insert_id} with data #{collected.inspect}"
    end
    # Then do your normal migration operations here
end

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors