找到你要的答案

Q:Count the current week records from multiple table in Postgresql and Talend

Q:在Postgresql,人才多表算本周的记录

I have multiple tables in Postgresql. Say , A table has column A,column B,column C,refresh_date,fiscal week. B table has column D,column E,column B,fiscal week,refresh_date. I want to find the total count of records for the current week for table A and the total count of column E from table B for the current week. I am using Talend for data loading from table A and table B and those tables are in Postgresql. Also, if column E has values equal to zero for that current week then it should send a mail to myself. I want to create a generic code for this as this is for table A and table B, there will be multiple tables used similar to this. How to do this in Talend and Postgresql?

I have multiple tables in Postgresql. Say , A table has column A,column B,column C,refresh_date,fiscal week. B table has column D,column E,column B,fiscal week,refresh_date. I want to find the total count of records for the current week for table A and the total count of column E from table B for the current week. I am using Talend for data loading from table A and table B and those tables are in Postgresql. Also, if column E has values equal to zero for that current week then it should send a mail to myself. I want to create a generic code for this as this is for table A and table B, there will be multiple tables used similar to this. How to do this in Talend and Postgresql?

answer1: 回答1:

I've created something like your need a few months ago, and it basically executes a random query and parses the resultset and stores it denormalized in a database table. Note that I use enterprise Talend that has a neat feature called Dynamic Schema: https://help.talend.com/pages/viewpage.action?pageId=190513179

So where should we start? My typical query looks like this:

select pk1 as rcr_grby_pk1, pk2 as rcr_grpby_pk2, 
       count(*) as cnt, sum(amount) as sum_amount
from mySchema.myTable
group by pk1, pk2

Obviously the Select query could be anything, can contain any number of columns. We execute it and store the results in a table that would look like this:

--------------------------------------------------------
|  schema  |  table  |  pk     |measure_name| value_n  |
--------------------------------------------------------
| mySchema | myTable | foo2015 | cnt        | 1234     |
--------------------------------------------------------
| mySchema | myTable | foo2015 | sum_amount | 987.65   |
--------------------------------------------------------
| mySchema | myTable | bar2014 | cnt        | 4321     |
--------------------------------------------------------
| mySchema | myTable | bar2014 | sum_amount | 567.89   |
--------------------------------------------------------

We've distinguished the 3 basic types: text, numbers, dates.

I guess you can have an idea how to write and store these SQL queries, along with an ID that could be passed to talend and stored in the target table, so you can see what generated that result.

So the processing piece. tFlowToIterate -> tJavaFlex -> tLogRow

I've placed everything into a joblet because we are using this to reconcile data between different databases. (e.g. Oracle and Postgres) Joblet content:

tJavaFlex has an output schema like this:

tJavaFlex content is like this:

begin:

Dynamic record = ((Dynamic)globalMap.get("input.line"));

String group_by_columns = "";

for(int i = 0 ; i < record.getColumnCount() ; i++) {
    DynamicMetadata meta = record.getColumnMetadata(i);
    if(meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){
        group_by_columns  += "" + record.getColumnValue(i);
    }
}
for(int i = 0 ; i < record.getColumnCount() ; i++) {
    DynamicMetadata meta = record.getColumnMetadata(i);
    if(false == meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){

main:

out.grp_id = context.grp_id;
out.job_id = context.job_id;
out.table_test_id = context.table_test_id;

out.group_by_columns = group_by_columns;
out.measure_column_name = meta.getDbName().toUpperCase();

out.result_n = /* Float */ null;
out.result_v = /* String */ null;
out.result_d = /* Date */ null;

if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_String") ) ){
    out.result_v = String.valueOf(record.getColumnValue(i));
} else if( (record.getColumnValue(i)!=null) && meta.getType().equals("id_Date") ) {
    System.out.println(String.valueOf(record.getColumnValue(i)));   
    out.result_d = (Date)record.getColumnValue(i);
} else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_Integer") 
        || meta.getType().equals("id_Double") 
        || meta.getType().equals("id_Float")
        || meta.getType().equals("id_Long") )) {
    out.result_n = Float.valueOf(String.valueOf(record.getColumnValue(i)));
} else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_BigDecimal") ) ) {
    out.result_n = new BigDecimal( String.valueOf(record.getColumnValue(i)) ).floatValue();
} else {
    //Should not happen
    System.out.println("\n Unhandled type: " + meta.getType() );    
}

end:

    } // if
} //for

PS: I know Float is a bad choice to store numbers, but hadn't had time to rework it, and it still gives acceptable results.

I've created something like your need a few months ago, and it basically executes a random query and parses the resultset and stores it denormalized in a database table. Note that I use enterprise Talend that has a neat feature called Dynamic Schema: https://help.talend.com/pages/viewpage.action?pageId=190513179

So where should we start? My typical query looks like this:

select pk1 as rcr_grby_pk1, pk2 as rcr_grpby_pk2, 
       count(*) as cnt, sum(amount) as sum_amount
from mySchema.myTable
group by pk1, pk2

显然,选择查询可以是任意的,可以包含任意数量的列。我们执行它,并将结果存储在一个表,将看起来像这样:

--------------------------------------------------------
|  schema  |  table  |  pk     |measure_name| value_n  |
--------------------------------------------------------
| mySchema | myTable | foo2015 | cnt        | 1234     |
--------------------------------------------------------
| mySchema | myTable | foo2015 | sum_amount | 987.65   |
--------------------------------------------------------
| mySchema | myTable | bar2014 | cnt        | 4321     |
--------------------------------------------------------
| mySchema | myTable | bar2014 | sum_amount | 567.89   |
--------------------------------------------------------

我们区分了3种基本类型:文本、数字、日期。

我想你可以知道如何写和存储这些SQL查询,随着一个ID可以通过人才和存储在目标表,这样你就可以看到生成的结果。

So the processing piece. tFlowToIterate -> tJavaFlex -> tLogRow

I've placed everything into a joblet because we are using this to reconcile data between different databases. (e.g. Oracle and Postgres) Joblet content:

tjavaflex具有输出模式这样:

tjavaflex内容是这样的:

开始:

Dynamic record = ((Dynamic)globalMap.get("input.line"));

String group_by_columns = "";

for(int i = 0 ; i < record.getColumnCount() ; i++) {
    DynamicMetadata meta = record.getColumnMetadata(i);
    if(meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){
        group_by_columns  += "" + record.getColumnValue(i);
    }
}
for(int i = 0 ; i < record.getColumnCount() ; i++) {
    DynamicMetadata meta = record.getColumnMetadata(i);
    if(false == meta.getDbName().toUpperCase().startsWith("RCR_GRPBY_") ){

主要:

out.grp_id = context.grp_id;
out.job_id = context.job_id;
out.table_test_id = context.table_test_id;

out.group_by_columns = group_by_columns;
out.measure_column_name = meta.getDbName().toUpperCase();

out.result_n = /* Float */ null;
out.result_v = /* String */ null;
out.result_d = /* Date */ null;

if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_String") ) ){
    out.result_v = String.valueOf(record.getColumnValue(i));
} else if( (record.getColumnValue(i)!=null) && meta.getType().equals("id_Date") ) {
    System.out.println(String.valueOf(record.getColumnValue(i)));   
    out.result_d = (Date)record.getColumnValue(i);
} else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_Integer") 
        || meta.getType().equals("id_Double") 
        || meta.getType().equals("id_Float")
        || meta.getType().equals("id_Long") )) {
    out.result_n = Float.valueOf(String.valueOf(record.getColumnValue(i)));
} else if( (record.getColumnValue(i)!=null) && ( meta.getType().equals("id_BigDecimal") ) ) {
    out.result_n = new BigDecimal( String.valueOf(record.getColumnValue(i)) ).floatValue();
} else {
    //Should not happen
    System.out.println("\n Unhandled type: " + meta.getType() );    
}

结束:

    } // if
} //for

我知道浮动是一个糟糕的选择存储数字,但没有时间返工,它仍然提供了可接受的结果。

postgresql  generics  count  talend  dayofweek