Chapter 2 fi_SUA-FBS plugin R code

The code for the ‘fi_SUA-FBS’ plugin can be downloaded from the SWS at the plugin tab to check the current latest version. The plugin is composed by one main file and three additional functions. A paragraph is dedicated to each of them.

2.1 Main file

The first part of the main file is dedicated to library loading and parameter selection:

  • The structure of the four session used is stored in the sessionKey_ objects.

  • the function flagMethodAss provides the appropariate flagMethod depending on the flagObservationStatus of the figure.

  • The country parameter is optional therefore if the parameter is null or has length zero the countries from the session of the FBS FIAS dataset are pulled (sessionCountry object).

  • The start and end years are mandatory parameters and all the selected years are stored in the yearVals object.

library(faosws)
library(faoswsUtil)
library(faoswsProcessing)
library(data.table)
library(sendmailR)
library(zoo)

# -- Token QA ----
if(CheckDebug()){
  
  library(faoswsModules)
  SETTINGS = ReadSettings("sws.yml")
  
  ## If you're not on the system, your settings will overwrite any others
  R_SWS_SHARE_PATH = SETTINGS[["share"]]
  
  ## Define where your certificates are stored
  SetClientFiles(SETTINGS[["certdir"]])
  
  ## Get session information from SWS. Token must be obtained from web interface
  GetTestEnvironment(baseUrl = SETTINGS[["server"]],
                     token = 'b1193433-3a61-4ee1-9f26-a54a8a6b447b')
  
}


flagMethodAss <- function(fos){
  
  
  if(is.na(fos)){
    fm <- '-'
  } else {
  
  fos <- as.character(fos)
  
  if(fos == 'E'){
    fm <- 'f'
  } else if(fos == 'I'){
    fm <- 'i'
  }  else {
    fm <- '-'
  }
}
  return(as.character(fm))
}

# -- Parameters ----

message("fi_SUA-FBS: Getting parameters")
# Paramenters from session
sessionKey_fbsFias = swsContext.datasets[[1]]
sessionKey_suaUnb = swsContext.datasets[[2]]
sessionKey_suabal = swsContext.datasets[[3]]
sessionKey_fbsFaostat = swsContext.datasets[[4]]

# Countries either from session or from parameters
countryPar <-  swsContext.computationParams$countries
print(countryPar)
if(!is.null(countryPar) & length(countryPar) > 0){
  countryPar <- swsContext.computationParams$countries
  sessionCountry <- strsplit(countryPar, ', ')[[1]]
} else {
  sessionCountry <- swsContext.datasets[[1]]@dimensions$geographicAreaM49_fi@keys
  countries <- GetCodeList("FisheriesCommodities", "fi_fbs_fias", "geographicAreaM49_fi")[ type == 'country']$code
  # Make sure only countries not areas
  sessionCountry <- sessionCountry[sessionCountry %in% countries]
}
message(paste("fi_SUA-FBS: countries selected ", paste0(sessionCountry, collapse = ', '), '.', sep = ''))

# Mandatory year values
maxyear <- as.numeric(swsContext.computationParams$maxyear)
minyear <- as.numeric(swsContext.computationParams$minyear)
yearVals <- as.character(minyear:maxyear)
year <- max(as.numeric(yearVals))

The data from the initial datasets (Global production and Commodity) can now be retrieved. The ‘fisheriesCatchArea’ dimension of the Global production dataset is aggregated and the production code is made coherent with the rest of the data. All the observation flags, before aggregation, are transformed into factor objects so to make aggregation and flag selection easier. Method flag are usually set to ‘s’ being the flag for sum elements. The production of Channel Islands and Isle of Man are added to the UK production, the same happens with the COmmodity dataset.


# -- Needed datasets ----

## Get global production (from Production environment)

message("fi_SUA-FBS: Pulling data from Global production")

KeyGlobal <- DatasetKey(domain = "Fisheries", dataset = "fi_global_production", dimensions = list(
  geographicAreaM49_fi = Dimension(name = "geographicAreaM49_fi", keys = sessionCountry),
  fisheriesAsfis = Dimension(name = "fisheriesAsfis", keys = GetCodeList("Fisheries", "fi_global_production","fisheriesAsfis" )[,code]),
  fisheriesCatchArea = Dimension(name = "fisheriesCatchArea", keys = GetCodeList("Fisheries", "fi_global_production","fisheriesCatchArea" )[,code]),
  measuredElement = Dimension(name = "measuredElement", keys = c("FI_001")),
  timePointYears = Dimension(name = "timePointYears", keys = yearVals )))

globalProduction <- GetData(KeyGlobal)

# Add Channel Is and Isle of Man to UK
globalProduction[geographicAreaM49_fi %in% c('830','833'), geographicAreaM49_fi := '826']

# Aggregate by fisheriesCatchArea
# Convert flags into ordinal factor so that simple aggregation is possible
# The function aggregateObservationFlag is too slow so flag are transformed into factors

globalProduction$flagObservationStatus <- factor(globalProduction$flagObservationStatus, 
                                                 levels = c('M', 'O', 'N', '', 'X', 'T', 'E', 'I'), 
                                                 ordered = TRUE)
if(nrow(globalProduction)){
globalProduction <- globalProduction[ , list(ValueAggr = sum(Value, na.rm = TRUE), 
                                             flagObservationStatusAggr = max(flagObservationStatus),
                                             flagMethodAggr = flagMethodAss(max(flagObservationStatus))),
                                      by=c("geographicAreaM49_fi",
                                           "fisheriesAsfis",
                                           "measuredElement",
                                           "timePointYears")]
}
setnames(globalProduction, names(globalProduction), c("geographicAreaM49_fi", "fisheriesAsfis",
                                                      "measuredElement", "timePointYears",
                                                      "Value", "flagObservationStatus",
                                                      "flagMethod"))

# Hard code change from FI_001 to 5510, both are Production in tonnes.
globalProduction$measuredElement <- ifelse(globalProduction$measuredElement == "FI_001", "5510", globalProduction$measuredElement)
if(any(globalProduction$measuredElement != "5510") ){
  message("Not all the elements in Global production dataset are FI_001")
}

The commodity data are loaded by country for monitoring purposes, all the re-export data are transformed in export (SUA do not contains re-exports) and the data in units and not in tonnes (if any) are discarded. The value elements are set aside as they do not go through all the process as quantities. The mapping table for species and commodities are loaded.


## Get Commodities data (from QA environment)

message("fi_SUA-FBS: Pulling data from Commodites dataset")

commodityDB0 <- data.table()

for(i in 1:length(sessionCountry)){
  KeyComm <- DatasetKey(domain = "Fisheries Commodities", dataset = "commodities_total", dimensions = list(
    geographicAreaM49_fi = Dimension(name = "geographicAreaM49_fi", keys = sessionCountry[i]),
    measuredItemISSCFC = Dimension(name = "measuredItemISSCFC", keys = GetCodeList("FisheriesCommodities", "commodities_total","measuredItemISSCFC" )[,code]),
    measuredElement = Dimension(name = "measuredElement", keys = GetCodeList("FisheriesCommodities", "commodities_total","measuredElement" )[,code]),
    timePointYears = Dimension(name = "timePointYears", keys = yearVals )))
  
  commodityDBchunk <- GetData(KeyComm)
  commodityDB0 <- rbind(commodityDB0, commodityDBchunk)
  print(i)
}

# Add Channel Is and Isle of Man to UK
commodityDB0[geographicAreaM49_fi %in% c('830','833'), geographicAreaM49_fi := '826']


commodityDB0$flagObservationStatus <- factor(commodityDB0$flagObservationStatus,
                                            levels = c('M', 'O', 'N', '', 'X', 'T', 'E', 'I'), 
                                            ordered = TRUE)

# Re-export in Export (quantity and values)
commodityDB0[measuredElement == '5912', measuredElement := '5910'] # quantity
commodityDB0[measuredElement == '5923', measuredElement := '5922'] # Value in 1000$
commodityDB0[measuredElement == '5931', measuredElement := '5930'] # Unit value $/t


commodityDB0 <- commodityDB0[!measuredElement %in% c('5907', '5937', 
                                                     '5607', '5637',
                                                     '5906', '5940')]

# Other import

# Isolate prices (not entering all the processing)
ValueElements <- c('5930', '5630')
commodityDB <- commodityDB0
if(nrow(commodityDB) > 0){
# Aggregate re-export to export
commodityDB <- commodityDB[ , c("Value", 
                                "flagObservationStatus", 
                                "flagMethod") := list(sum(Value, na.rm = TRUE),
                                                      max(flagObservationStatus),
                                                      flagMethodAss(max(flagObservationStatus))),
                            by = c('geographicAreaM49_fi',
                                   'timePointYears',
                                   'measuredItemISSCFC',
                                   'measuredElement')]
}
# Get datatables that correspond to the EBX code
map_isscfc <- ReadDatatable('map_isscfc')
setnames(map_isscfc, "measured_item_isscfc", "measuredItemISSCFC")

map_asfis <- ReadDatatable('map_asfis')
setnames(map_asfis, "asfis", "fisheriesAsfis")

The Global production is merged with the mapping table to assign the ICS code to all the species and the species that do not have a correspondence are stored in a message that will be pasted in the final email and the not-mapped species are eventually discarded. The plugin also checks if there is any deviation in the ‘gp_mapping’ data table and takes them into account if any. The Global production is then aggregated by ICS code as required to enter the SUA.


# -- Start processing global production ----

message('fi_SUA-FBS: Start processing Global Production')

# Map to ICS
globalProductionMapping <- merge(globalProduction, map_asfis, by = c("fisheriesAsfis"), all.x = TRUE)

if(any(is.na(globalProductionMapping$ics))){
  notMappedSpecies <- unique(globalProductionMapping[is.na(ics)]$fisheriesAsfis)
  message(paste("These species are not mapped to any ICS group:", 
                paste(notMappedSpecies, collapse = ", ")))
  msg2email1 <- paste("These species are not mapped to any ICS group:", 
                      paste(notMappedSpecies, collapse = ", "))
  }
msg2email1 <- ifelse(any(is.na(globalProductionMapping$ics)), msg2email1, "")

message('Species - ICS mapping table for exceptions.')

# Load table of deviated species
new_map_asfis <- data.table()
for(i in seq(sessionCountry)){
    new_country <-  ReadDatatable('gp_mapping', where = paste("country = '", sessionCountry[i], "'", sep = ''))
    new_map_asfis <- rbind(new_map_asfis, new_country)
}

# Deviate species
if(nrow(new_map_asfis) > 0){ # if there is any change from default YBKlang mapping
  new_map_asfis[ end_year == 'LAST', end_year := as.character(year)]
  newMapping <- merge(globalProductionMapping, new_map_asfis, 
                      by.x = c('geographicAreaM49_fi', 'fisheriesAsfis'),
                      by.y = c('country', 'asfis'), all = TRUE, allow.cartesian = TRUE)
  
  unchanged <- newMapping[is.na(from_code)]
  unchanged <- rbind(unchanged,  newMapping[!is.na(from_code) & timePointYears > end_year | timePointYears < start_year])
  tochange <- newMapping[!is.na(from_code)]
  tochange <- tochange[ , c('timePointYears', 'end_year', 'start_year') := list(as.numeric(timePointYears), as.numeric(end_year), as.numeric(start_year))]
  tochange1 <- tochange[ timePointYears <= end_year & timePointYears >= start_year & ratio == 1]
  tochange1 <- tochange1[, ics := to_code]
  duplicate <- tochange[ timePointYears <= end_year & timePointYears >= start_year & ratio < 1]
  
  # Allow for splitting
  if(nrow(duplicate) > 0){
    duplicate[ timePointYears <= end_year & timePointYears >= start_year & ratio != 1, c('ics', 'Value') := list(to_code, Value * as.numeric(ratio))]
    duplicate[ , total := sum(as.numeric(ratio)), by = c('geographicAreaM49_fi', 'fisheriesAsfis', 'measuredElement', 'timePointYears','from_code', 'start_year', 'end_year')]
    
    if(nrow(duplicate[total < 1]) > 0){
      duplicate[ , diff := (1-total)]
      addMissingQuantities <- duplicate[diff != 0, ]
      addMissingQuantities[ , c('Value', 'ratio') := list(sum(Value), diff), 
                            by = c('geographicAreaM49_fi', 'fisheriesAsfis', 'measuredElement', 
                                   'timePointYears', 'from_code', 'start_year', 'end_year')]
      addMissingQuantities[ , c('Value', 'to_code', 'ics') := list((Value/total)*as.numeric(ratio), from_code, from_code)]
      setkey(addMissingQuantities)
      addMissingQuantities <- unique(addMissingQuantities)
      duplicate <- rbind(duplicate, addMissingQuantities)
      duplicate[ , diff := NULL]
    }
    duplicate[ , total := NULL]
  }
  changed <- rbind(tochange1, duplicate)
  gpMap_new <- rbind(unchanged, changed) 
  gpMap_new[ , c('from_code', 'to_code', 'start_year', 'end_year', 'ratio'):= NULL]
} else {  # if no change from default YBKlang mapping
  gpMap_new  <- globalProductionMapping
}

globalProductionAggr <- gpMap_new[, list(Value = sum(Value, na.rm = TRUE),
                                         flagObservationStatus = max(flagObservationStatus),
                                         flagMethod = flagMethodAss(max(flagObservationStatus))), 
                                  by = list(geographicAreaM49_fi,
                                            timePointYears,
                                            measuredElement,
                                            ics)]

# Remove ISSCAAP/ASFIS not mapped to any ICS (Sponges, Corals, Pearls): 
globalProductionAggr <- globalProductionAggr[!is.na(ics), ]

A similar process to the one just described for Global production is applied to the Commodity dataset but the aggregation happens after two additional steps: the link table and the other uses deviation.


# -- Start processing commodity DB ----

message('fi_SUA-FBS: Start processing commodity DB')
# Map to ICS
commodityDBIcs <- merge(commodityDB, map_isscfc, by = "measuredItemISSCFC")
commodityDBIcs$measuredItemISSCFC <- as.character(commodityDBIcs$measuredItemISSCFC)

# Check if all commodities are mapped
if(any(is.na(commodityDBIcs$ics))){
  
  notMappedCommodity <- unique(commodityDBIcs[is.na(ics)]$measuredItemISSCFC)
  message(paste("These species are not mapped to any ICS group:", 
                paste(notMappedCommodity, collapse = ", ")))
  msg2email1CDB <- paste("These species are not mapped to any ICS group:", 
                      paste(notMappedCommodity, collapse = ", "))
}
msg2email1CDB <- ifelse(any(is.na(commodityDBIcs$ics)), msg2email1CDB, "")

message('Commodity - ICS mapping table for exceptions.')

# Account for commodity deviation as for GP
new_map_isscfc <- data.table()

for(i in seq(sessionCountry)){
  new_country_isscfc <-  ReadDatatable('cdb_mapping', where = paste("country = '", sessionCountry[i], "'", sep = ''))
  new_map_isscfc <- rbind(new_map_isscfc, new_country_isscfc)
}

if(nrow(new_map_isscfc) > 0){
  new_map_isscfc[ end_year == 'LAST', end_year := as.character(year)]
  newMappingCDB <- merge(commodityDBIcs, new_map_isscfc,
                         by.x = c('geographicAreaM49_fi', 'measuredElement','measuredItemISSCFC'),
                         by.y = c('country', 'element','isscfc'), all = TRUE, allow.cartesian = TRUE)
  
  unchangedCDB <- newMappingCDB[is.na(from_code)]
  unchangedCDB <- rbind(unchangedCDB,  newMappingCDB[!is.na(from_code) & timePointYears > end_year | timePointYears < start_year])
  tochangeCDB <- newMappingCDB[!is.na(from_code)]
  tochangeCDB <- tochangeCDB[ , c('timePointYears', 'end_year', 'start_year') := list(as.numeric(timePointYears), as.numeric(end_year), as.numeric(start_year))]
  tochangeCDB1 <- tochangeCDB[ timePointYears <= end_year & timePointYears >= start_year & ratio == 1]
  tochangeCDB1 <- tochangeCDB1[, ics := to_code]
  duplicateCDB <- tochangeCDB[ timePointYears <= end_year & timePointYears >= start_year & ratio < 1]
  
  # Allow for splitting
  if(nrow(duplicateCDB) > 0){
    duplicateCDB[ timePointYears <= end_year & timePointYears >= start_year & ratio != 1, c('ics', 'Value') := list(to_code, Value * as.numeric(ratio))]
    duplicateCDB[ , total := sum(ratio), by = c('geographicAreaM49_fi', 'measuredItemISSCFC', 'measuredElement', 'timePointYears','from_code', 'start_year', 'end_year')]
    
    if(nrow(duplicateCDB[total < 1]) > 0){
      duplicateCDB[ , diff := (1-total)]
      addMissingQuantitiesCDB <- duplicateCDB[diff != 0, ]
      addMissingQuantitiesCDB[ , c('Value', 'ratio') := list(sum(Value), diff), 
                            by = c('geographicAreaM49_fi', 'measuredItemISSCFC', 'measuredElement', 
                                   'timePointYears', 'from_code', 'start_year', 'end_year')]
      addMissingQuantitiesCDB[ , c('Value', 'to_code', 'ics') := list((Value/total)*as.numeric(ratio), from_code, from_code)]
      setkey(addMissingQuantitiesCDB)
      addMissingQuantitiesCDB <- unique(addMissingQuantitiesCDB)
      duplicateCDB <- rbind(duplicateCDB, addMissingQuantitiesCDB)
      duplicateCDB[ , diff := NULL]
    }
    duplicateCDB[ , total := NULL]
  }
  changedCDB <- rbind(tochangeCDB1, duplicateCDB)
  cdbMap_new <- rbind(unchangedCDB, changedCDB) 
  cdbMap_new <- rbind(unchangedCDB, changedCDB[ics != '9999']) # 9999 is a code when the production of the commodity does not have to be considered
  cdbMap_new[ , c('from_code', 'to_code', 'start_year', 'end_year', 'ratio'):= NULL]
  # Sum by ICS, no commodities anymore
} else {
  cdbMap_new <- commodityDBIcs
}

The link table is a table to deviate flows from one ICS code to another. The flow is not expressed in code so it has to be converted through the ‘link_table_elements’ data table and the checks to make sure the 100% of the available quantity is deviated controls are coded. The deviation period is checked and the deviation is applied.


#-- Link table ----
message('fi_SUA-FBS: Link table and other uses deviations')
# Link table for special period ICS group changes (regularly updated)
link_table <- ReadDatatable("link_table")

link_table <- link_table[geographic_area_m49 %in% sessionCountry]
## Checks on link table
# quantity different from 100% allocated
link_table[ , check := sum(percentage), by=c("geographic_area_m49","flow","start_year","end_year","from_code")]

if(any(link_table$check!=1)){
  message(paste0("Not 100% of the original quantity is allocated for link:" , 
                 paste0(link_table[link_table$check!=1,], collapse = " ")))
  msg2email2 <- paste0("Not 100% of the original quantity is allocated for link:" , 
                       paste0(link_table[link_table$check!=1,], collapse = " "))
}
msg2email2 <- ifelse(any(link_table$check!=1), msg2email2, "")

# Link table expressed in terms of "PRD", "TRD", "EXP", "IMP", "ALL"
# they translate into standard measuredElement through linkCorrespondence datatable 'link_table_elements'

linkCorrespondence <- ReadDatatable('link_table_elements')
setnames(linkCorrespondence, old = 'measuredelement', new = 'measuredElement')

message('From flow to code')
link_table2 <- merge(link_table, linkCorrespondence, by = "flow", allow.cartesian = TRUE)

link_table2$end_year <- ifelse(link_table2$end_year == "LAST", max(as.numeric(cdbMap_new$timePointYears)),
                               link_table2$end_year)

link_table2 <- link_table2[end_year >= as.character(minyear)]

years <- expand.grid(as.character(1:nrow(link_table2)), 1948:year)
years <- as.data.table(years)
setnames(years, c('Var1','Var2'), c('idx', 'timePointYears'))

link_table2[ , idx := row.names(link_table2) ]
 
link_table3 <- merge(link_table2, 
                    years, by = 'idx')

link_table3 <- link_table3[timePointYears >= start_year & timePointYears <= end_year]
link_table3[ , idx := NULL]
link_table3[ ,timePointYears := as.character(timePointYears)]
# Change ICS codes
message('From table to CDB')
commodityDBLink <- merge(cdbMap_new, link_table3, 
                         by.x = c("geographicAreaM49_fi", "measuredElement", "timePointYears", "ics"),
                         by.y = c("geographic_area_m49", "measuredElement", "timePointYears","from_code"), 
                         all.x = TRUE, allow.cartesian = TRUE)

setkey(commodityDBLink)
commodityDBLink <- unique(commodityDBLink)

# Avoid NAs for periods
commodityDBLink$start_year <- ifelse(is.na(commodityDBLink$start_year), "1900", commodityDBLink$start_year)
commodityDBLink$end_year <- ifelse(is.na(commodityDBLink$end_year), "9999", commodityDBLink$end_year)

# Change ICS for defined periods
commodityDBLink[!is.na(to_code) & 
                  as.numeric(timePointYears) >= as.numeric(start_year) &
                  as.numeric(timePointYears) <= as.numeric(end_year), ics := to_code]

commodityDBLink[!is.na(percentage) , Value := Value*percentage]

# remove unnecessary dimensions
commodityDBLink <- commodityDBLink[ , c("flow", "start_year", "end_year", "percentage", "to_code", "check") := NULL]

A similar but simpler process is applied to the data with the ‘other_uses’ data table before aggregation. Value elements are also aggregated and appended to the rest.


##-- Other uses introduction ----
# Some commodities are not imported for food purpouses (e.g. "ornamental fish").
# Those flow are deviated to "other utilizations"

otherUses <- ReadDatatable('other_uses')
message('Other uses merge')
commodityDBotherUses <- merge(commodityDBLink, otherUses, 
                              by.x = c( "measuredItemISSCFC", "measuredElement", "ics"),
                              by.y = c("isscfc", "measured_element_orig", "ics"))

commodityDBotherUses$measuredElement <- ifelse(is.na(commodityDBotherUses$measured_element_dest),
                                               commodityDBotherUses$measuredElement,
                                               commodityDBotherUses$measured_element_dest)
commodityDBotherUses <- commodityDBotherUses[ , c("label", "measured_element_dest", "fias_code") := NULL]
commodityDBdeviated <- rbind(commodityDBLink, commodityDBotherUses)

# Sum by ICS, no commodities anymore

if(nrow(commodityDBdeviated)>0){
commodityDBAggr <- commodityDBdeviated[ , list(Value = sum(Value, na.rm = TRUE),
                                               flagObservationStatus = max(flagObservationStatus),
                                               flagMethod = flagMethodAss(max(flagObservationStatus))),
                                        by = list(geographicAreaM49_fi,
                                                  timePointYears,
                                                  measuredElement,
                                                  ics)]
} else {
  commodityDBAggr <- commodityDBdeviated[ , list(Value = sum(Value, na.rm = TRUE),
                                                 flagObservationStatus = flagObservationStatus,
                                                 flagMethod = flagMethod),
                                          by = list(geographicAreaM49_fi,
                                                    timePointYears,
                                                    measuredElement,
                                                    ics)]
}

tradeQ <- commodityDBAggr[measuredElement %in% c('5910', '5610')]
tradeV <- commodityDBAggr[measuredElement %in% c('5922', '5622')]
tradeQ[measuredElement == '5910', flow := 'EXP']
tradeQ[measuredElement == '5610', flow := 'IMP']
tradeV[measuredElement == '5922', flow := 'EXP']
tradeV[measuredElement == '5622', flow := 'IMP']

if(nrow(tradeQ) > 0){
tradeUV <- merge(tradeQ, tradeV, by = c('geographicAreaM49_fi',
                             'timePointYears',
                             'flow',
                             'ics'), all = T,
                 suffixes = c('Q', 'V'))

tradeUV <- tradeUV[, c('Value', 'flagObservationStatus', 'flagMethod') := 
                            list(ValueV/ValueQ, flagObservationStatusV, "i")]
tradeUV[flow == 'IMP', measuredElement := '5630' ]
tradeUV[flow == 'EXP', measuredElement := '5930' ]
tradeUV[is.nan(Value), Value := 0]
tradeUV[ValueQ == 0, Value := ValueV]


commodityDBAggrTot <- rbind(commodityDBAggr[!measuredElement %in% c('5930', '5630')], 
                            tradeUV[,.(geographicAreaM49_fi,
                                       timePointYears,
                                       measuredElement,
                                       ics, Value, 
                                       flagObservationStatus, 
                                       flagMethod)])
} else {
  commodityDBAggrTot <- commodityDBAggr
}


ValueElements <- c('5922', '5930', '5622', '5630')

The aggregated Global production and Commodity datasets are bind together to form the SUA unbalanced which is saved back into the SWS. An additional aggregation is done in case Global production and Commodity datasets have quantities for same ICS code.


# -- SUA ----

SUA <- rbind(globalProductionAggr, commodityDBAggrTot)
setnames(SUA, "ics", "measuredItemFaostat_L2")

SUA <- SUA[ , list(Value = sum(Value, na.rm = TRUE),
                   flagObservationStatus = max(flagObservationStatus),
                   flagMethod = flagMethodAss(max(flagObservationStatus))), 
            by = list(geographicAreaM49_fi,
                      timePointYears,
                      measuredElement,
                      measuredItemFaostat_L2)]
setnames(SUA, 'measuredElement', 'measuredElementSuaFbs')

SUA <- SUA[!is.na(Value)]

message("fi_SUA-FBS: Saving SUA unbalanced data")

CONFIG <- GetDatasetConfig(sessionKey_suaUnb@domain, sessionKey_suaUnb@dataset)

stats <- SaveData(domain = CONFIG$domain,
                  dataset = CONFIG$dataset,
                  data = SUA, waitTimeout = Inf)

msg2email3 <- paste0("Pull data to SUA_unbalanced process completed successfully! ",
                     stats$inserted, " observations written, ",
                     stats$ignored, " weren't updated, ",
                     stats$discarded, " had problems.")

Once the SUA unbalanced is ready, value elements are separated from the rest of the data again. The availability is the calculated for each ICS product with the help of the ‘element_sign_table’ data table


# -- Filling SUA ----

# set aside price elements
SUAValues <- SUA[measuredElementSuaFbs %in% ValueElements, ]

SUA <- SUA[! measuredElementSuaFbs %in% ValueElements, ] 

# Load datatable with element signs to calculate availability
# Equation: Prod + Imp + Stock - Exp - Feed - Seed - Loss - Processing - Food - Other util = 0

elementSignTable <- ReadDatatable('element_sign_table')
setnames(elementSignTable, 'measured_element', 'measuredElementSuaFbs')

# Now compute availability from SUA unbalanced as a first check
SUAexpanded <- merge(SUA, elementSignTable[ , .(measuredElementSuaFbs, sign)], by = "measuredElementSuaFbs", all.x = TRUE)

if(any(is.na(SUAexpanded$sign))){
  stop('There is an element in the SUA not included in the availability calculation.')
}

message("fi_SUA-FBS: Calculating availability")
SUAexpanded[, availability := sum(Value * sign, na.rm = TRUE), 
            by = list(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)]


Negative imbalances are checked and divided into primary and secondary imbalances. Primary imbalances are notified and compensated with the ‘Residual other uses’ (5166) element for a quantity equal to the imbalance. The user will treat this imbalance in the shiny app. Secondary imbalances regarding meals are treated as primary imbalance whereas for the other the production is increased for a value equal to the imbalance. They are the stored into the ‘imbalance_tab’ data table.


# Check no negative primary availability. 
map_asfis <- ReadDatatable('map_asfis')
setnames(map_asfis, c("asfis", "ics"), c("fisheriesAsfis", "measuredItemFaostat_L2"))
setkey(map_asfis)
primary <- unique(map_asfis$measuredItemFaostat_L2)
primaryneg <- SUAexpanded[availability < 0 & measuredItemFaostat_L2 %in% primary]

if(nrow(primaryneg) > 0){
  countriesneg <- unique(primaryneg[ , .(geographicAreaM49_fi, measuredItemFaostat_L2, timePointYears)])
  msgg <- apply(countriesneg,1, paste0, collapse = ', ')
  msg2email4 <- paste0('There are negative primary availabilities. Check (country code, product code, year): ',
                       paste0(msgg, collapse = " and "))
  message(msg2email4)
}
msg2email4 <- ifelse(nrow(primaryneg) > 0, msg2email4, "")

# If primary imbalance then put imbalance to Statistical discrepancy (Residual other uses (rou) 5166)

rou <- copy(primaryneg)
rou[ , c('measuredElementSuaFbs', 
         'Value', 
         'flagObservationStatus', 
         'flagMethod', 'sign') := list('5166', availability,
                                       'I', 'i', -1)]
setkey(rou)
rou <- unique(rou)

SUAexpanded <- rbind(SUAexpanded, rou)

message("fi_SUA-FBS: Start processing negative availability")

secondaryneg0 <- SUAexpanded[availability < 0 & !measuredItemFaostat_L2 %in% primary]
setkey(secondaryneg0)
secondaryneg0 <- unique(secondaryneg0)

# Delete old imbalances stored
imbalance_store <- ReadDatatable('imbalance_tab', readOnly = FALSE)
if(nrow(imbalance_store[ geographicaream49_fi %in% unique(secondaryneg0$geographicAreaM49_fi) & timepointyears %in% yearVals, ]) > 0){
  changeset <- Changeset('imbalance_tab')
  AddDeletions(changeset, imbalance_store[ geographicaream49_fi %in% unique(secondaryneg0$geographicAreaM49_fi) & timepointyears %in% yearVals, ])
  Finalise(changeset)
}

# Add new imbalances
secondarynegCompliant <- copy(secondaryneg0)
secondarynegCompliant <- secondarynegCompliant[ , .(geographicAreaM49_fi, measuredItemFaostat_L2,
                                                    timePointYears, availability)]
setkey(secondarynegCompliant)
secondarynegCompliant <- unique(secondarynegCompliant)

setnames(secondarynegCompliant,
         c('geographicAreaM49_fi', 'timePointYears',
           'measuredItemFaostat_L2'),
         c('geographicaream49_fi', 'timepointyears',
           'measureditemfaostat_l2'))

changeset <- Changeset('imbalance_tab')
AddInsertions(changeset, secondarynegCompliant)
Finalise(changeset)

if(nrow(secondaryneg0) > 0){
  
  countriessecneg <- unique(secondaryneg0[ , .(geographicAreaM49_fi, measuredItemFaostat_L2, timePointYears)])
  msgg2 <- apply(countriessecneg,1, paste0, collapse = ', ')
  
  msg2email5 <- paste0('There are negative secondary availabilities. Check (country code, product code, year): : ',
                       paste0(msgg2, collapse = " and "))
  message(msg2email5)
}
msg2email5 <- ifelse(nrow(secondaryneg0) > 0, msg2email5, "")

# Get meals codes
mealCodes <- GetCodeList("FisheriesCommodities", 
                         "fi_sua_balanced_validated",
                         "measuredItemFaostat_L2")[ grepl('meals', description)]$code

if(any(secondaryneg0$measuredItemFaostat_L2 %in% mealCodes)){
  mealsUnbal <- secondaryneg0[measuredItemFaostat_L2 %in% mealCodes]
  message('Unbalance for meal products!')
  secondaryneg <- secondaryneg0[!measuredItemFaostat_L2 %in% mealCodes]
} else {
  secondaryneg <- secondaryneg0
    }

rouMeals <- copy(secondaryneg0[measuredItemFaostat_L2 %in% mealCodes])
rouMeals[ , c('measuredElementSuaFbs', 
         'Value', 
         'flagObservationStatus', 
         'flagMethod', 'sign') := list('5166', availability,
                                       'I', 'i', -1)]
setkey(rouMeals)
rouMeals <- unique(rouMeals)

# secondaryneg is secondary imbalances without meals
if(nrow(secondaryneg) > 0){
  # Make sure all production (5510) values have been imputed
  icsneg <- unique(secondaryneg$measuredItemFaostat_L2)
  setkey(secondaryneg, geographicAreaM49_fi, timePointYears,  measuredItemFaostat_L2, availability)
  prod2add <- unique(secondaryneg[ , .(geographicAreaM49_fi, timePointYears,  measuredItemFaostat_L2, availability) ])
  
  # add production element with NA values and flags then estimate as in Francesca code with estimation flags
  prod2add[ , ':=' (measuredElementSuaFbs = '5510', Value = - availability,
                    flagObservationStatus = as.factor('I'), flagMethod = 'i', sign = 1)]
  
  # SUA with all production values
  SUAwithProdupd <- merge(secondaryneg, prod2add, by = c('geographicAreaM49_fi',
                                                         'timePointYears',
                                                         'measuredItemFaostat_L2',
                                                         'availability',
                                                         'measuredElementSuaFbs'),
                          suffixes = c('', '_added'), all = TRUE)
  SUAwithProdupd$sign_added <- as.integer(SUAwithProdupd$sign_added)
  
  # add production to delete negative availability
  SUAwithProdupd[measuredElementSuaFbs == '5510' , c("Value", "sign", 
                                                     "flagObservationStatus",
                                                     "flagMethod") := list(ifelse(is.na(Value), Value_added, 
                                                                                  Value+Value_added),
                                                                           sign_added,
                                                                           flagObservationStatus_added,
                                                                           flagMethod_added)]
  SUAcomplement <- SUAexpanded[!secondaryneg, on = names(secondaryneg)]
  SUAcomplement <- rbind(SUAcomplement, rouMeals)
  
  # Putting together values with negative and positive availability which had been separated before
  SUAwithProd <- rbind(SUAwithProdupd[ , .(geographicAreaM49_fi, timePointYears,
                                           measuredItemFaostat_L2, availability,
                                           measuredElementSuaFbs, Value,
                                           flagObservationStatus, flagMethod, sign)],
                       SUAcomplement)
} else {
  SUAwithProd <- SUAexpanded
#  SUAwithProd[ , sign := NULL ]
}

After these adjustments availability is calculated again and some validated values are pulled from the validated SUA balanced dataset in order to apply extraction rates and food ratio to the current data.


# Recalculate availability after adjustments

SUAwithProd[, availability := sum(Value * sign, na.rm = TRUE), 
            by = list(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)]

SUAwithProd[ , sign := NULL]

# Add input value if present
KeySUAinput <- DatasetKey(domain = "Fisheries Commodities", dataset = "fi_sua_balanced_validated", # TO SUBSTITUTE WITH 'fi_sua_balanced_validated' 
                                                                                                # and erase the /10000 line
                          dimensions = list(
                            geographicAreaM49_fi = Dimension(name = "geographicAreaM49_fi", keys = sessionCountry),
                            measuredItemFaostat_L2 = Dimension(name = "measuredItemFaostat_L2", keys = GetCodeList("FisheriesCommodities", "fi_sua_balanced_validated","measuredItemFaostat_L2" )[,code]),
                            measuredElementSuaFbs = Dimension(name = "measuredElementSuaFbs", keys = c('5510', '5141', '5423')), # GetCodeList("FisheriesCommodities", "fi_sua_balanced_validated","measuredElementSuaFbs" )[,code]),
                            timePointYears = Dimension(name = "timePointYears", keys = as.character((minyear -1):maxyear))))

SUAstored <- GetData(KeySUAinput)
SUAvalEr <- SUAstored[measuredElementSuaFbs == '5423' ]

SUAwithInput <- SUAwithProd #rbind(SUAwithProd, SUAstoredInput)
setkey(SUAwithInput)
SUAwithInput <- unique(SUAwithInput)

For primary items the food/production ratio for the latest available year is computed and applied to the current production for groups that need balancing. This is to avoid to assign all the food processing to the primary products. This process is applied only to products having food as balancing element.

# Calculate ratio of food to assign to primary elements
SUAvalProd <- SUAstored[measuredItemFaostat_L2 %in% primary & measuredElementSuaFbs == '5510' & timePointYears == as.character(max(as.numeric(yearVals)) - 1)]
SUAvalFood <- SUAstored[measuredItemFaostat_L2 %in% primary & measuredElementSuaFbs == '5141' & timePointYears == as.character(max(as.numeric(yearVals)) - 1)]


foodShare <- merge(SUAvalProd, SUAvalFood, by = c('geographicAreaM49_fi', 
                                                  'measuredItemFaostat_L2', 
                                                  'timePointYears'),
                   suffixes = c('Prod', 'Food'), all = TRUE)

foodShare[ , perc := (ValueFood/ValueProd), by = c('geographicAreaM49_fi', 
                                                           'measuredItemFaostat_L2', 
                                                           'timePointYears')]
# At least 3% is dedicated to Food
foodShare[ , perc := ifelse(perc < 0.03 | is.na(perc), 0.03, perc)]

balancingElements <- ReadDatatable('balancing_elements')

setnames(balancingElements, names(balancingElements), c("geographicAreaM49_fi", 
                                                        "measuredItemFaostat_L2",
                                                        "measuredElementSuaFbs",
                                                        "start_year", "end_year", "share"))

# Pull ICS whose balancing element is food for the country
onlyfood <- balancingElements[!measuredItemFaostat_L2 %in% primary & geographicAreaM49_fi %in% sessionCountry & measuredElementSuaFbs == '5141']
# For secondary groups at least 3% is dedicated to Food
onlyfood[ , perc := 0.03]
SUAvalFoodSec <- onlyfood[ , .(geographicAreaM49_fi, measuredItemFaostat_L2, perc)]

# Take primary food
foodShare2apply1 <- foodShare[!is.na(perc) & perc != Inf & perc <= 1 ]
foodShare2apply1 <- foodShare2apply1[ , .(geographicAreaM49_fi, measuredItemFaostat_L2, perc)]

# Put together primary and secondary food
foodShare2apply <- rbind(foodShare2apply1, SUAvalFoodSec)

Extraction rate and input calculations are processes needing external functions. Once the extraction rates are calculated the new commodity tree with the updated extraction rates is built and used in the following parts of the code. Availability is also calculated again considering the food element calculated earlier.


# -- ER & input calc ----

message("fi_SUA-FBS: Calculating extraction rates")

tree <- ReadDatatable('fi_commodity_tree')
treePrim <- copy(tree)
treePrim <- treePrim[parent %in% primary ]
SUAwithEr <- eRcomputation(data = SUAwithInput, tree = treePrim, 
                           years = yearVals, oldEr = SUAvalEr)

message("fi_SUA-FBS: Calculating input element")
# If input available then eR is calculated dividing prod/input => input = prod/eR 
# so the result is the same, input are all calculated
SUAinput <- inputComputation(data = SUAwithEr)

newTree <- merge(tree, unique(SUAinput[ measuredElementSuaFbs == '5423' & !is.na(Value),
                                        .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2, Value)]), 
                 by.x = 'child', by.y = 'measuredItemFaostat_L2', all.x = TRUE, allow.cartesian = TRUE)
newTree[ , extraction_rate := Value ]
newTree[ , Value:= NULL]

#-- Recalculate availability using food imputation for primary products ----
food1 <- merge(SUAinput[availability > 0 & measuredElementSuaFbs == '5510'], 
               foodShare2apply, 
               by = c('geographicAreaM49_fi', 'measuredItemFaostat_L2'), 
               all.x = TRUE )

food1[ , c('measuredElementSuaFbs', 
           'flagObservationStatus',
           'flagMethod'):= list('5141', 'E', 'b')]

food2 <- food1[!is.na(Value) & !is.na(perc), ValueFood := Value*perc]
food2 <- food2[ , c('Value','perc') := NULL]
food2 <- food2[!is.na(ValueFood)]
# If the required percentage is available it is assigned
# Otherwise the food processing calculation will assign it
# according to availability
food <- food2[ availability < ValueFood, ValueFood := availability]
food <- food[ , ValueFood := ValueFood * 0.95]
setnames(food, 'ValueFood', 'Value')

SUAFood <- rbind(SUAinput, food)

SUAFood <- merge(SUAFood, 
                 elementSignTable[ , .(measuredElementSuaFbs, sign)], 
                 by = "measuredElementSuaFbs", 
                 all.x = TRUE)

SUAFood[, availability := sum(Value*sign, na.rm = TRUE), 
        by = list(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)]

SUAFood[ , sign := NULL ]

Food processing element is calculated with an external function as well. When the food processing is ready, the problems are stored in datatables that the shiny reads. The imbalances resulting impossible to cover even with secondary production are assigned as food processing to the corresponding primary element and balanced by an equal quantity as ‘Residual other uses’ (5166). The user has to address the imbalance in the shiny app. Elements are summed to avoid duplicates. The food processing is calculated twice, once including the imputed food and once re-assigning the food part to production. The combination leading to less imbalances and best continuity in terms of food quantity assigned per product is chosen.


# --Food processing ----

message("fi_SUA-FBS: Calculating food processing")
FPdata_alltest <- foodProcessingComputation(SUAinput = SUAFood, treeNewER = newTree, primary = primary)
FPdatatest <- FPdata_alltest$result
FPdatatest <- FPdatatest[Value != 0]
FPproblemstest <- list(primary = data.table(),
                       secondaryTot = data.table(),
                       secondary = data.table(),
                       tertiary = data.table(),
                       quaternary = data.table(),
                       NotCovered = data.table())

FPproblemstest <- FPdata_alltest$problems

# Change of input to calculate FP for problematic data ----

# Get problematic groups
if(any(sapply(FPproblemstest, nrow)>0)){
# if(length(FPproblemstest) > 1 & nrow(FPproblemstest$NotCovered) > 0){
avoidProblems <- rbindlist(FPproblemstest, fill = TRUE)
avoidProblems <- unique(avoidProblems[ , .(geographicAreaM49_fi,
                                           timePointYears,
                                           parent_primary)])

# Parent-child tree
treeneeded0 <- data.table(parent = unique(avoidProblems$parent_primary),
                          child = unique(avoidProblems$parent_primary))
treeneeded <- unique(newTree[parent %in% unique(avoidProblems$parent_primary), .(parent, child) ])
treeneeded <- rbind(treeneeded, treeneeded0)

# Get complete structure of problematic groups
avoidProblems2 <- merge(avoidProblems, treeneeded,
                        by.x = 'parent_primary',
                        by.y = 'parent', all.x = TRUE,
                        allow.cartesian = TRUE)
setnames(avoidProblems2, 'child', 'measuredItemFaostat_L2')

# The problematic groups with food are recalculated without food
subst <- merge(SUAinput, avoidProblems2[ , .(geographicAreaM49_fi,
                                             timePointYears,
                                             measuredItemFaostat_L2)],
               by = c("geographicAreaM49_fi",
                      "timePointYears", 
                      "measuredItemFaostat_L2"))

# The part that was okay with food is recalculated without the problematic part
cancel <- merge(SUAFood, avoidProblems2[ , .(geographicAreaM49_fi,
                                             timePointYears,
                                             measuredItemFaostat_L2)],
                by = c("geographicAreaM49_fi",
                       "timePointYears", 
                       "measuredItemFaostat_L2"))

SUAFoodcan <- SUAFood[!cancel, on = names(SUAFood)]

if(nrow(SUAFoodcan) > 0){
FPdata_all1 <- foodProcessingComputation(SUAinput = SUAFoodcan, treeNewER = newTree, primary = primary)
FPdata1 <- FPdata_all1$result
FPdata1 <- FPdata1[Value != 0]
FPproblems1 <- list(primary = data.table(),
                    secondaryTot = data.table(),
                    secondary = data.table(),
                    tertiary = data.table(),
                    quaternary = data.table(),
                    NotCovered = data.table())
FPproblems1 <- FPdata_all1$problems
} else {
  FPdata1 <- data.table()
  FPproblems1 <- list(primary = data.table(),
                      secondaryTot = data.table(),
                      secondary = data.table(),
                      tertiary = data.table(),
                      quaternary = data.table(),
                      NotCovered = data.table())
}
# FP calculated for problematic elements
FPdata_all2 <- foodProcessingComputation(SUAinput = subst,
                                         treeNewER = newTree,
                                         primary = primary)

FPdata2 <- FPdata_all2$result
FPdata2 <- FPdata2[Value != 0]
FPproblems2 <- list(primary = data.table(),
                    secondaryTot = data.table(),
                    secondary = data.table(),
                    tertiary = data.table(),
                    quaternary = data.table(),
                    NotCovered = data.table())
FPproblems2 <- FPdata_all2$problems

# Put together results and dataset to consider
FPdata <- rbind(FPdata1, FPdata2)
FPproblems <- FPproblems2
SUAnoFP <- rbind(SUAFoodcan, subst)
} else {SUAnoFP <- SUAFood
FPdata <- FPdatatest
FPproblems <- list(primary = data.table(),
                   secondaryTot = data.table(),
                   secondary = data.table(),
                   tertiary = data.table(),
                   quaternary = data.table(),
                   NotCovered = data.table())}

message('Food re-processing okay')
############

FPdata <- FPdata[ , availability := NULL ]
FPdata[ , c("flagObservationStatus", "flagMethod") := list("E", "i")]

SUAnoFP[ , availability := NULL] # SUAFood[ , availability := NULL]
SUAunbal <- rbind(SUAnoFP[!is.na(Value), ], FPdata)
SUAunbal$flagObservationStatus <- as.character(SUAunbal$flagObservationStatus)

if(is.list(FPproblems$NotCovered) & nrow(FPproblems$NotCovered) > 0){
  uncovered <- copy(FPproblems$NotCovered)
  uncovered[ , measuredElementSuaFbs := '5023']
  setnames(uncovered, c('parent_primary', 'UncoveredQuantity'),
           c('measuredItemFaostat_L2', 'Value'))
  
  rouUncovered <-copy(uncovered) 
  rouUncovered[ , measuredElementSuaFbs := '5166']
  rouUncovered[ , Value := -Value]
  
  uncoveredAdjusted <- rbind(uncovered, rouUncovered) 
  uncoveredAdjusted[ , c("flagObservationStatus", "flagMethod") := list("E", "i")]
  uncoveredAdjusted[ measuredElementSuaFbs == '5166' , c("flagObservationStatus", "flagMethod") := list("I", "i")]
  
} else {
  uncoveredAdjusted <- data.table()
}

SUAunbal <- rbind(SUAunbal, uncoveredAdjusted)
SUAunbal$flagObservationStatus <- factor(SUAunbal$flagObservationStatus, levels = c('M', 'O', 'N', '', 'X', 'T', 'E', 'I'), ordered = TRUE)

SUAunbal <- SUAunbal[ , list(Value = sum(Value, na.rm = TRUE),
                             flagObservationStatus = max(flagObservationStatus),
                             flagMethod = flagMethodAss(max(flagObservationStatus))), 
              by = c("geographicAreaM49_fi", "timePointYears",
                     "measuredItemFaostat_L2", "measuredElementSuaFbs")]

setkey(SUAunbal)
SUAunbal <- unique(SUAunbal)

The balancing process can start and the current new availability is calculated with the usual data table. The availability is assigned to the appropriate balancing element for each product via the ‘balancing_elements’ data table. Any encountered balancing problem is stored into the ‘balancing_problems_tab’ data table.


# -- Balancing ----

SUAunbal <-  merge(SUAunbal, elementSignTable[, .(measuredElementSuaFbs, sign)], by = "measuredElementSuaFbs", all.x = TRUE)

if(any(is.na(SUAunbal$sign))){
  stop('There is an element in the SUA not included in the availability calculation.')
}

# Calculate imbalance
SUAunbal[ , availability := sum(Value * sign, na.rm = TRUE), 
          by = list(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)]
SUAunbal[ , availability := round(availability, 6)]
message('fi_SUA-FBS: Pulling balancing elements')

currentYear <- as.numeric(gsub("\\-[0-9]*", "", Sys.Date()))
balancingElements[ end_year == "LAST"]$end_year <- as.character(currentYear)

balancingValues <- unique(SUAunbal[ , .(geographicAreaM49_fi, timePointYears , measuredItemFaostat_L2, availability) ])

# assign imbalance to balancing elements
balancing <- merge(balancingElements, 
                   balancingValues, by = c("geographicAreaM49_fi","measuredItemFaostat_L2"), # [availability != 0]
                   all.y = TRUE)
setnames(balancing, c("availability"), c("Value"))

# Controll all imbalances have a balancing element
if(any(is.na(balancing$measuredElementSuaFbs)) & any(balancing[is.na(measuredElementSuaFbs)]$availability != 0)){
  message('Balancing elements missing!')
  message(balancing[is.na(measuredElementSuaFbs) & availability != 0])
}

balancing2merge <- balancing[ as.numeric(timePointYears) >= as.numeric(start_year) & as.numeric(timePointYears) <= as.numeric(end_year), Value := Value*share]
balancing2merge[ , c('start_year', 'end_year', 'share') := NULL]
balancing2merge[ , c('flagObservationStatus', 'flagMethod') := list('E','b')]
# Balancing cannot be negative
balancingproblems <- balancing2merge[round(Value,6) < 0,]

# Store balancing problems
balancingproblems_store <- ReadDatatable('balancing_problems_tab', readOnly = FALSE)
if(nrow(balancingproblems_store[ geographicaream49_fi %in% sessionCountry & timepointyears %in% yearVals, ]) > 0){
  changeset <- Changeset('balancing_problems_tab')
  AddDeletions(changeset, balancingproblems_store[ geographicaream49_fi %in% sessionCountry & timepointyears %in% yearVals, ])
  Finalise(changeset)
}

# Add new imbalances
if(is.list(FPproblems$NotCovered) & length(FPproblems$NotCovered) > 0){
  toupload <- copy(FPproblems$NotCovered)
  toupload[ , measuredElementSuaFbs := '5023']
  setnames(toupload, c('parent_primary', 'UncoveredQuantity'),
           c('measuredItemFaostat_L2', 'Value'))
  balancingproblemsCompliant <- rbind(toupload, balancingproblems[ , .(geographicAreaM49_fi, measuredItemFaostat_L2,
                                                                       timePointYears, measuredElementSuaFbs, Value)])
} else {
  balancingproblemsCompliant <- rbind(balancingproblems[ , .(geographicAreaM49_fi, measuredItemFaostat_L2,
                                                             timePointYears, measuredElementSuaFbs, Value)])
}

if(nrow(balancingproblemsCompliant) > 0){
  setkey(balancingproblemsCompliant)
  balancingproblemsCompliant <- unique(balancingproblemsCompliant)
  
  setnames(balancingproblemsCompliant,
           c('geographicAreaM49_fi', 'timePointYears',
             'measuredItemFaostat_L2', 'measuredElementSuaFbs', 'Value'),
           c('geographicaream49_fi', 'timepointyears',
             'measureditemfaostat_l2', 'measuredelementsuafbs', 'value'))
  
  changeset <- Changeset('balancing_problems_tab')
  AddInsertions(changeset, balancingproblemsCompliant)
  Finalise(changeset)
}

# if negative balancing element then balance imbalance with 5166
balancingimb <- copy(balancing2merge[Value < 0])
balancingimb <- balancingimb[Value < 0, c('measuredElementSuaFbs', 
                                          'flagObservationStatus', 
                                          'flagMethod') := list('5166', 'I', 'i')]
balancingimb[ , Value := Value]

balancingTot <- rbind(balancing2merge[Value > 0], balancingimb)

The just calculated values for balancing elements are summed to those already computed (typically food) and availability is calculated again to make sure the balancing process has worked and all availability figures are null.

SUAbal <- merge(SUAunbal[ , .(geographicAreaM49_fi, timePointYears , 
                              measuredItemFaostat_L2, measuredElementSuaFbs, 
                              Value, flagObservationStatus, flagMethod)], 
                balancingTot,
                by = c('geographicAreaM49_fi', 'timePointYears', 
                       'measuredItemFaostat_L2', 'measuredElementSuaFbs'),
                suffixes = c('','Bal'), 
                all = TRUE)

SUAbal[is.na(ValueBal), ValueBal := 0 ]
SUAbal[is.na(Value), Value := 0 ]
SUAbal[ , Value := Value + ValueBal ]
SUAbal$flagObservationStatus <- as.character(SUAbal$flagObservationStatus)
SUAbal[is.na(flagObservationStatus) , flagObservationStatus := 'E']
SUAbal[is.na(flagMethod) , flagMethod := 'b']
SUAbal <- SUAbal[ , c('ValueBal', 
                      'flagObservationStatusBal',
                      'flagMethodBal') := NULL]

# Re-calculate availability
SUAbalAvail <- merge(SUAbal, elementSignTable[, .(measuredElementSuaFbs, sign)], by = "measuredElementSuaFbs", all.x = TRUE)

SUAbalAvail[, availability := sum(Value * sign, na.rm = TRUE), 
            by = list(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)]

if(any(round(SUAbalAvail$availability) < 0)){
  message("fi_SUA-FBS: Negative availability for some products.")
  msg2email7 <- paste0('Negative availability for products:', 
                       paste0(unique(SUAbalAvail[round(availability) != 0, ]$measuredItemFaostat_L2), collapse = ", "),
                       'for country', paste0(unique(SUAbalAvail[round(availability) != 0, ]$geographicAreaM49_fi), collapse = ", "))
  
}

msg2email7 <- ifelse(any(round(SUAbalAvail$availability) < 0), msg2email7, "")

SUAbalAvail[, c("sign", "availability"):=NULL]

Now all available quantity has been assigned the nutrition values are calculated for food quantities with the ‘fishery_nutrient’ data table and bind to the SUA.


# -- SUA with nutrient ----

# Read NutrientFactors
nutrientFactors <- ReadDatatable("fishery_nutrient")
nutrientFactors$calories <- as.numeric(nutrientFactors$calories)
nutrientFactors$proteins <- as.numeric(nutrientFactors$proteins)
nutrientFactors$fats <- as.numeric(nutrientFactors$fats)
nutrientFactors[is.na(proteins), proteins := 0]

SUA_with_nutrient <- merge(SUAbalAvail, nutrientFactors, by.x = "measuredItemFaostat_L2", by.y = "ics", all.x = TRUE)

SUA_with_nutrient[measuredElementSuaFbs=="5141", calories:=Value*calories/100]
SUA_with_nutrient[measuredElementSuaFbs=="5141", proteins:=Value*proteins/100]
SUA_with_nutrient[measuredElementSuaFbs=="5141", fats:=Value*fats/100]
SUA_with_nutrient[measuredElementSuaFbs!="5141",`:=`(c("calories", "proteins", "fats"),list(0,0,0) )]

# Get "calories", "proteins" and "fats" and make them in the dataset format
SUAnutrients <-  melt.data.table(SUA_with_nutrient[measuredElementSuaFbs=="5141", ],
                                 id.vars = c('geographicAreaM49_fi', 'measuredItemFaostat_L2', 'timePointYears'),
                                 measure.vars = c('calories', 'proteins','fats'),
                                 variable.name = 'measuredElementSuaFbs', value.name = 'Value')
SUAnutrients$measuredElementSuaFbs <- as.character(SUAnutrients$measuredElementSuaFbs)
SUAnutrients$measuredElementSuaFbs <- ifelse(SUAnutrients$measuredElementSuaFbs == 'calories', '261',
                                             ifelse(SUAnutrients$measuredElementSuaFbs == 'proteins', '271',
                                                    ifelse(SUAnutrients$measuredElementSuaFbs == 'fats', '281', SUAnutrients$measuredElementSuaFbs)))

SUAnutrients[ , c('flagObservationStatus', 'flagMethod') := list('E','i')]
SUAnutrients <- unique(SUAnutrients)
foodOnly <- SUA_with_nutrient[measuredElementSuaFbs=="5141", .(measuredItemFaostat_L2, measuredElementSuaFbs,
                                                               geographicAreaM49_fi, timePointYears, Value,
                                                               flagObservationStatus, flagMethod)]

SUAnutrients <- rbind(SUAnutrients, foodOnly)

Population data are loaded from the ‘population_unpd’ dataset to compute per caput nutrition values. Once calculated the corresponding element code is assigned


# Get population data
elemKeys <- "511"

keyPop <- DatasetKey(domain = "population", dataset = "population_unpd", dimensions = list(
  geographicAreaM49 = Dimension(name = "geographicAreaM49", keys = sessionCountry),
  measuredElement = Dimension(name = "measuredElement", keys = elemKeys),
  timePointYears = Dimension(name = "timePointYears", keys = yearVals)
))

popSWS <- GetData(keyPop)
setnames(popSWS,c("geographicAreaM49", "measuredElement"),c("geographicAreaM49_fi", "measuredElementSuaFbs"))

sourceMetaData0 <- GetMetadata(keyPop)
sourceMetaData <- sourceMetaData0[grepl("WPP", Metadata_Value),]
sourceMetaData <- sourceMetaData[Metadata == 'GENERAL', ]
setnames(sourceMetaData, c("geographicAreaM49", "measuredElement"),c("geographicAreaM49_fi", "measuredElementSuaFbs"))

# Calculate per capita elements
SUAnutrientCapita <- merge(SUAnutrients, popSWS, by=c("geographicAreaM49_fi","timePointYears"), suffixes = c("","_pop"))  
SUAnutrientCapita[measuredElementSuaFbs !="5141" , Value := (Value*1000)/(Value_pop*365)]
SUAnutrientCapita[measuredElementSuaFbs =="5141" , Value := Value/Value_pop]
SUAnutrientCapita <- SUAnutrientCapita[ , .(geographicAreaM49_fi,
                                            timePointYears,
                                            measuredItemFaostat_L2,
                                            measuredElementSuaFbs,
                                            Value, flagObservationStatus,
                                            flagMethod)]

SUAnutrientCapita[measuredElementSuaFbs=="261",measuredElementSuaFbs:="264"]
SUAnutrientCapita[measuredElementSuaFbs=="281",measuredElementSuaFbs:="284"]
SUAnutrientCapita[measuredElementSuaFbs=="271",measuredElementSuaFbs:="274"]
SUAnutrientCapita[measuredElementSuaFbs=="5141",measuredElementSuaFbs:="645"]

SUA_with_nutrient[ , c('calories', 'proteins','fats') := NULL] 

# bind SUA with "calories", "proteins" and "fats" elements
SUAallNutr <- rbind(SUAnutrients[measuredElementSuaFbs!="5141"], SUAnutrientCapita)
SUANoPop <- rbind(SUA_with_nutrient, SUAallNutr)
Pop2include <- merge(unique(SUANoPop[ , .(measuredItemFaostat_L2,
                                          geographicAreaM49_fi,
                                          timePointYears)]), popSWS, by = c('geographicAreaM49_fi', 
                                                                            'timePointYears'))

The version of the World Population Prospect (WPP) used is saved as metadata in the SUA balanced dataset.


# Metadata
Metadata2include <- merge(unique(SUANoPop[ , .(measuredItemFaostat_L2,
                                               geographicAreaM49_fi,
                                               timePointYears)]), sourceMetaData, by = c('geographicAreaM49_fi', 
                                                                                         'timePointYears'),
                          allow.cartesian = TRUE)

The SUA balanced with nutrients, the SUA for value elements and the population data are bind together and saved into the SUA balanced dataset.


# add population
SUAwithPop <- rbind(SUANoPop, Pop2include)
# add values
SUAwithValues <- rbind(SUAwithPop, SUAValues)

SUA2save <- SUAwithValues[!is.na(Value)]

# -- Saving SUA balanced ----

message("fi_SUA-FBS: Saving SUA balanced data")
CONFIG2 <- GetDatasetConfig(sessionKey_suabal@domain, sessionKey_suabal@dataset)

stats2 <- SaveData(domain = CONFIG2$domain,
                   dataset = CONFIG2$dataset,
                   data = SUA2save[measuredElementSuaFbs!= "645" ],
                   metadata = Metadata2include, waitTimeout = Inf)

msg2email8 <- paste0("SUA food processing, balancing and nutrient inclusion processes completed successfully! ",
                     stats2$inserted, " observations written, ",
                     stats2$ignored, " weren't updated, ",
                     stats2$discarded, " had problems.")

The standardization process can now start. The first step of the Faostat and FIAS standardizations are common, they differentiate further on in the plugin. First of all the commodity tree is rebuilt with the updated extraction rates and including the primary products with extraction rate equal to one. Note, in this phase the extraction rates are referred to as conversion factors. The appropriate quantities are converted into primary equivalent with the conversion factors and then, whereas for primary products all elements are kept, for secondary the input and production are discarded since they are included into the food processing elements of hierarchically higher products. Quantities not converted with conversion factors are: input, population and nutritional values. And the value elements are also discarded.


# -- Standardisation & FBS ----

# get all conversion factors (or extration rates) from commodity tree
message('Get commodity tree')
tree <- ReadDatatable('fi_commodity_tree')

extrRates <- unique(SUA2save[ measuredElementSuaFbs == '5423', .(measuredItemFaostat_L2, geographicAreaM49_fi, timePointYears, Value)])
compareEr <- merge(unique(tree[parent %in% primary , .(child, extraction_rate)]), extrRates, by.x = 'child', by.y = 'measuredItemFaostat_L2', all.y = TRUE)
compareEr[is.na(Value), Value := extraction_rate]

updatedEr <- compareEr[ , .(child, Value, geographicAreaM49_fi, timePointYears)]
updatedtree <- merge(unique(tree[parent %in% primary , .(parent, child, weight)]), updatedEr, by= 'child', all.y = TRUE, allow.cartesian = TRUE)
setkey(updatedtree)
convFact <- unique(updatedtree[weight == TRUE & !is.na(Value) , .(geographicAreaM49_fi, timePointYears, parent, child, Value)])

# For primary product add conversion factor equal 1
primaryTree1 <- data.table(geographicAreaM49_fi = rep(unique(SUA2save$geographicAreaM49_fi), each = length(unique(SUA2save$timePointYears))), 
                           timePointYears = rep(unique(SUA2save$timePointYears), length(unique(SUA2save$geographicAreaM49_fi))) )

primaryTree2 <- data.table(geographicAreaM49_fi = rep(unique(SUA2save$geographicAreaM49_fi), each = length(unique(primary))), parent = primary)
primaryTree <- merge(primaryTree1, primaryTree2, by = 'geographicAreaM49_fi', allow.cartesian = TRUE)  
primaryTree[ , `:=` (child = parent, Value = 1)]
convFact <- rbind(convFact, primaryTree)
setnames(convFact, new = 'extraction_rate', old = 'Value')

# SUA with standardized element, no zero weight elements

SUA2save <- SUA2save[!measuredElementSuaFbs %in% ValueElements]
SUAstand_prep <- merge(SUA2save, convFact, by.x = c('geographicAreaM49_fi', 'timePointYears', 'measuredItemFaostat_L2'),
                       by.y = c('geographicAreaM49_fi', 'timePointYears', 'child'))
SUAstand_prep <- SUAstand_prep[!is.na(Value)]
setkey(SUAstand_prep)
SUAstand_prep <- unique(SUAstand_prep)
# Standardised value is Value/eR except from input values which are already in primary equivalent
SUAstand_prep[! measuredElementSuaFbs %in% c('5302', '261', '264', '271', '274', '281', '284', '511'), Value_stand := Value/extraction_rate]
SUAstand_prep[measuredElementSuaFbs %in% c('5302', '261', '264', '271', '274', '281', '284', '511'), Value_stand := Value]

# take only all primary elements and for secondary exclude production and input
SUAstand <-  rbind(SUAstand_prep[measuredItemFaostat_L2 %in% primary, ], 
                   SUAstand_prep[!measuredElementSuaFbs %in% c('5510', '5302') & !measuredItemFaostat_L2 %in% primary, ])

Now all quantities are standardized, for Faostat standardization all the elements but population are summed by primary parent so to have only the major groups and the actual major group code according to the ‘measuredItemFaostat_L2’ dimension is overwritten to the primary ICS code with the data table ‘faostatl2_to_faostatl1’.


# -- FAOSTAT standardization ----

# Aggregate SUA by parent meals included for FAOSTAT
SUAstandAggr0 <- SUAstand[ , .(measuredElementSuaFbs, geographicAreaM49_fi, timePointYears,
                               flagObservationStatus, flagMethod, parent, Value_stand)]

# SUAstandAggr0$flagObservationStatus <- factor(SUAstandAggr0$flagObservationStatus, levels = c('M', 'O', 'N', '', 'X', 'T', 'E', 'I'), ordered = TRUE)

SUAstandAggr1 <- SUAstandAggr0[measuredElementSuaFbs != '511' , list(Value = sum(Value_stand, na.rm = TRUE),
                                                                     flagObservationStatusAggr = 'I',
                                                                     flagMethodAggr = 's'), 
                               by = c("measuredElementSuaFbs", "geographicAreaM49_fi", "timePointYears", "parent")]

setnames(SUAstandAggr1, c('parent', 'flagObservationStatusAggr', 'flagMethodAggr'), 
         c('measuredItemFaostat_L2', 'flagObservationStatus', 'flagMethod'))

Pop2SUAaggr <- SUAstandAggr0[measuredElementSuaFbs == '511']
setkey(Pop2SUAaggr)
Pop2SUAaggr <- unique(Pop2SUAaggr)
setnames(Pop2SUAaggr, c('Value_stand', 'parent'), c('Value', 'measuredItemFaostat_L2'))
SUAstandAggr <- rbind(SUAstandAggr1, Pop2SUAaggr)

IcsGroups <- unique(SUAstandAggr[ , .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)])
faostat_pop2merge <- merge(IcsGroups, popSWS, by=c("geographicAreaM49_fi","timePointYears"), all = TRUE) 
faostatfbsPOP <- rbind(SUAstandAggr, faostat_pop2merge)

# map for FBS groups

sua_fbs_mapping <- ReadDatatable('faostatl2_to_faostatl1')
setnames(sua_fbs_mapping,  'measureditemfaostat_l2', 'measuredItemFaostat_L2' )
sua_fbs_mapping[ , label := NULL]

fbsFaostatL1faostat <- merge(faostatfbsPOP, sua_fbs_mapping, by = "measuredItemFaostat_L2")
fbsFaostatL1faostat[ , measuredItemFaostat_L2 := NULL]
setnames(fbsFaostatL1faostat, "fbs", "measuredItemFaostat_L2")

With the ‘fi_faostat_standardization_element’ data table the elements are aggregated by major groups and saved into the SWS in the Faostat FBS dataset.


#-- FAOSTAT FBS standardization ----

message("Starting Faostat standardization")
faostatGroups <- ReadDatatable('fi_faostat_standardization_element')
setnames(faostatGroups, c('measuredelementsuafbs', 'measuredelementfaostat'), c('measuredElementSuaFbs', 'measuredElementFaostat'))

FBSfaostat0 <- merge(fbsFaostatL1faostat[!measuredElementSuaFbs %in% c('261', '271', '281')], # c('261', '271', '281') elements not in FBS
                     faostatGroups, by = "measuredElementSuaFbs")
FBSfaostat1 <- FBSfaostat0[measuredElementFaostat != '511' , list(Value = sum(Value, na.rm = TRUE),
                                                                  flagObservationStatus = 'I',
                                                                  flagMethod = 's'), by = c("geographicAreaM49_fi",
                                                                                            "timePointYears", "measuredItemFaostat_L2",
                                                                                            "faostat", "measuredElementFaostat")]

FBSfaostat <- rbind(FBSfaostat1, FBSfaostat0[measuredElementFaostat == '511', -'measuredElementSuaFbs', with = FALSE])
setnames(FBSfaostat, c("faostat", "measuredElementFaostat"), c("element_description", "measuredElementSuaFbs"))
faostatFBS2save <- FBSfaostat[ , .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2,
                                   measuredElementSuaFbs, Value, flagObservationStatus, flagMethod)]
faostatFBS2save <- faostatFBS2save[!is.na(Value)]

message("fi_SUA-FBS: Saving FAOSTAT standardized data")
CONFIG4 <- GetDatasetConfig(sessionKey_fbsFaostat@domain, sessionKey_fbsFaostat@dataset)

statsFaostat <- SaveData(domain = CONFIG4$domain,
                         dataset = CONFIG4$dataset,
                         data = faostatFBS2save, waitTimeout = Inf)

msg2email10 <- paste0("Standardization process completed successfully! ",
                      statsFaostat$inserted, " observations written, ",
                      statsFaostat$ignored, " weren't updated, ",
                      statsFaostat$discarded, " had problems.")

The FIAS standardization is slightly different from the Faostat one and use its own data table to aggregate elements. From the SUA with standardized quantities, input element for meal products are taken aside. All other elements, but population, are summed by major groups and then meals and population are added again. The major group code is replaced to the correspondent primary product with the ‘faostatl2_to_faostatl1’ data table.


# -- FIAS ----

# Aggregate SUA by parent code take away meals
# SUA with elements and groups to aggregates
SUAstandAggrFias0 <- SUAstand[ !measuredItemFaostat_L2 %in% mealCodes , .(measuredElementSuaFbs, geographicAreaM49_fi, timePointYears,
                                                                          flagObservationStatus, flagMethod, parent, Value_stand)]

# take only input element 5302 for meal products
mealsInput0 <- SUA2save[measuredElementSuaFbs == '5302' & measuredItemFaostat_L2 %in% mealCodes , .(measuredElementSuaFbs, geographicAreaM49_fi, timePointYears,
                                                                                                    flagObservationStatus, flagMethod, 
                                                                                                    measuredItemFaostat_L2, Value)]

mealsInput <- merge(mealsInput0, unique(tree[parent %in% primary, .(parent, child)]), 
                    by.x = 'measuredItemFaostat_L2', by.y = 'child', all.x = TRUE)

mealsInput[ , measuredItemFaostat_L2 := NULL]
setnames(mealsInput, c("parent"), c("measuredItemFaostat_L2"))

# SUAstandAggrFias0$flagObservationStatus <- factor(SUAstandAggrFias0$flagObservationStatus, levels = c('M', 'O', 'N', '', 'X', 'T', 'E', 'I'), ordered = TRUE)

SUAstandAggrFias1 <- SUAstandAggrFias0[measuredElementSuaFbs != '511' , list(Value = sum(Value_stand, na.rm = TRUE),
                                                                             flagObservationStatusAggr = 'I',
                                                                             flagMethodAggr = 's'), by = c("measuredElementSuaFbs", "geographicAreaM49_fi",
                                                                                                           "timePointYears", "parent")]

setnames(SUAstandAggrFias1, c('parent', 'flagObservationStatusAggr', 'flagMethodAggr'), 
         c('measuredItemFaostat_L2', 'flagObservationStatus', 'flagMethod'))

Pop2SUAaggFias <- SUAstandAggrFias0[measuredElementSuaFbs == '511']
setnames(Pop2SUAaggFias,c('Value_stand', 'parent'), c('Value', 'measuredItemFaostat_L2'))
SUAstandAggrFias <- rbind(SUAstandAggrFias1, Pop2SUAaggFias)

fiasFbsTot <- rbind(SUAstandAggrFias, mealsInput)
IcsGroups <- unique(fiasFbsTot[ , .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)])

# Introduce population data
pop2merge <- merge(popSWS, IcsGroups, by = c("geographicAreaM49_fi", "timePointYears"), all = TRUE)
fiasfbsPOP <- rbind(fiasFbsTot, pop2merge)

# map for FBS groups
sua_fbs_mapping <- ReadDatatable('faostatl2_to_faostatl1')
setnames(sua_fbs_mapping,  'measureditemfaostat_l2', 'measuredItemFaostat_L2' )
sua_fbs_mapping[ , label := NULL]

fbsFaostatL1 <- merge(fiasfbsPOP, sua_fbs_mapping, by = "measuredItemFaostat_L2")
fbsFaostatL1[ , measuredItemFaostat_L2 := NULL]
setnames(fbsFaostatL1, "fbs", "measuredItemFaostat_L2")

The FBS elements required by the FIAS Methodology are stored in the ‘fi_fias_standardization_element’ data table. The data table is merged with the fbsFaostatL1 object excluding the elements that do not belong to the FBS (calories/year ,‘261’; proteins/year, ‘271’; fats/year, ‘281’). Population is then excluded as well and all the other products are aggregated by the new FBS elements. Population is then appended again to the data, metadata about the WPP are included also in this dataset and data are saved back into SWS.


#-- FIAS FBS standardization ---- 
message("fi_SUA-FBS: Starting Fias standardization")
fiasGroups <- ReadDatatable('fi_fias_standardization_element')
setnames(fiasGroups, c('measuredelementsuafbs', 'measuredelementfias'), c('measuredElementSuaFbs', 'measuredElementFias'))

FBSfias0 <- merge(fbsFaostatL1[!measuredElementSuaFbs %in% c('261', '271', '281')], 
                  fiasGroups, by = "measuredElementSuaFbs")
FBSfias1 <- FBSfias0[measuredElementFias != '511' , list(Value = sum(Value, na.rm = TRUE),
                                                         flagObservationStatus = 'I',
                                                         flagMethod = 's'), by = c("geographicAreaM49_fi",
                                                                                   "timePointYears", "measuredItemFaostat_L2",
                                                                                   "fias", "measuredElementFias")]

fiasPop <- FBSfias0[measuredElementFias == '511', -'measuredElementSuaFbs', with = FALSE]
setkey(fiasPop)
fiasPop <- unique(fiasPop)

FBSfias <- rbind(FBSfias1, fiasPop)
fiasFBS2save <-  FBSfias[, .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2, 
                             measuredElementFias, Value, flagObservationStatus, flagMethod)]

setnames(fiasFBS2save, "measuredElementFias", "measuredElementSuaFbs")

metadataGroups <- unique(fiasFBS2save[ , .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2)])
Metadata2includeFias <- merge(sourceMetaData, metadataGroups, by = c("geographicAreaM49_fi", "timePointYears"), 
                              all = TRUE, allow.cartesian = TRUE)

message("fi_SUA-FBS: Saving FIAS standardized data")
CONFIG3 <- GetDatasetConfig(sessionKey_fbsFias@domain, sessionKey_fbsFias@dataset)

statsFias <- SaveData(domain = CONFIG3$domain,
                      dataset = CONFIG3$dataset,
                      data = fiasFBS2save,
                      metadata = Metadata2includeFias,
                      waitTimeout = Inf)

msg2email9 <- paste0("Standardization process completed successfully! ",
                     statsFias$inserted, " observations written, ",
                     statsFias$ignored, " weren't updated, ",
                     statsFias$discarded, " had problems.")

Food processing problems are saved in the appropriate datatables and also sent as attachment via email to the user along with other saved messages along the plugin.

R_SWS_SHARE_PATH <- Sys.getenv("R_SWS_SHARE_PATH")

length(FPproblems)

primfp <- ReadDatatable('fi_fp_imb_primary', readOnly = F)
secfp <- ReadDatatable('fi_fp_imb_sec', readOnly = F)
tertfp <- ReadDatatable('fi_fp_imb_ter', readOnly = F)
quatfp <- ReadDatatable('fi_fp_imb_quat', readOnly = F)
sectot <- ReadDatatable('fi_fp_imb_sec_tot', readOnly = F)
ncfp <- ReadDatatable('fi_fp_not_covered', readOnly = F)

chn1 <- Changeset('fi_fp_imb_primary')
chn2<- Changeset('fi_fp_imb_sec')
chn3 <- Changeset('fi_fp_imb_ter')
chn4 <- Changeset('fi_fp_imb_quat')
chn5 <- Changeset('fi_fp_imb_sec_tot')
chn6 <- Changeset('fi_fp_not_covered')

AddDeletions(chn1, primfp[geographicaream49_fi %in% sessionCountry])
Finalize(chn1)

AddDeletions(chn2, secfp[geographicaream49_fi %in% sessionCountry])
Finalize(chn2)

AddDeletions(chn3, tertfp[geographicaream49_fi %in% sessionCountry])
Finalize(chn3)

AddDeletions(chn4, quatfp[geographicaream49_fi %in% sessionCountry])
Finalize(chn4)

AddDeletions(chn5, sectot[geographicaream49_fi %in% sessionCountry])
Finalize(chn5)

AddDeletions(chn6, ncfp[geographicaream49_fi %in% sessionCountry])
Finalize(chn6)


chn11 <- Changeset('fi_fp_imb_primary')
chn22 <- Changeset('fi_fp_imb_sec')
chn33 <- Changeset('fi_fp_imb_ter')
chn44 <- Changeset('fi_fp_imb_quat')
chn55 <- Changeset('fi_fp_imb_sec_tot')
chn66 <- Changeset('fi_fp_not_covered')

if(exists('primary', where = FPproblems)){
  if(nrow(FPproblems$primary)>0){
  names(FPproblems$primary) <- tolower(names(FPproblems$primary))
  AddInsertions(chn11, FPproblems$primary[geographicaream49_fi %in% sessionCountry])
  Finalize(chn11)
  }
}

if(exists('secondary', where = FPproblems)){
  if(nrow(FPproblems$secondary)>0){
  names(FPproblems$secondary) <- tolower(names(FPproblems$secondary))
  AddInsertions(chn22, FPproblems$secondary[geographicaream49_fi %in% sessionCountry])
  Finalize(chn22)
  }
}

if(exists('tertiary', where = FPproblems)){
  if(nrow(FPproblems$tertiary)>0){
  names(FPproblems$tertiary) <- tolower(names(FPproblems$tertiary))
  AddInsertions(chn33, FPproblems$tertiary[geographicaream49_fi %in% sessionCountry])
  Finalize(chn33)
  }
}

if(exists('quaternary', where = FPproblems)){
  if(nrow(FPproblems$quaternary)>0){
  names(FPproblems$quaternary) <- tolower(names(FPproblems$quaternary))
  AddInsertions(chn44, FPproblems$quaternary[geographicaream49_fi %in% sessionCountry])
  Finalize(chn44)
  }
}

if(exists('secondaryTot', where = FPproblems)){
  if(nrow(FPproblems$secondaryTot)>0){
  names(FPproblems$secondaryTot) <- tolower(names(FPproblems$secondaryTot))
  AddInsertions(chn55, FPproblems$secondaryTot[geographicaream49_fi %in% sessionCountry])
  Finalize(chn55)
  }
}

if(exists('NotCovered', where = FPproblems)){
  if(nrow(FPproblems$NotCovered)>0){
  names(FPproblems$NotCovered) <- tolower(names(FPproblems$NotCovered))
  AddInsertions(chn66, FPproblems$NotCovered[geographicaream49_fi %in% sessionCountry])
  Finalize(chn66)
  }
}


##-- send Email with notification of correct execution ----

emailtext <- paste("The plug-in has saved the data in your sessions. The plugin returned the following messages:", 
      msg2email1, 
      msg2email1CDB,
      msg2email2, 
      msg2email7, 
      msg2email8, 
      msg2email9, 
      msg2email10,
      collapse = '\n')

primaryUnb <- data.table()
if(nrow(primaryneg) > 0 ){
  primaryUnb <- countriesneg
}

secondaryUnb <- data.table()
if(nrow(secondaryneg) > 0){
  secondaryUnb <- countriessecneg
}

from = "sws@fao.org"
to = swsContext.userEmail
subject = "fi_SUA-FBS plug-in has correctly run"
body = list(emailtext, mime_part(primaryUnb), mime_part(secondaryUnb))
sendmailR::sendmail(from = from, to = to, subject = subject, msg = body)
paste0("Email sent to ", swsContext.userEmail)

2.2 External functions

The three external functions used in the plugin have as goal: computing the extraction rate, computing the input element and computing the food processing element before the balancing process.

2.2.1 Extraction rates

The function to compute extraction rates has 4 parameters:

  • data: the current structure of the SUA.

  • tree: the standard commodity tree with default extraction rates

  • oldEr: previous extraction rate figures pulled from the validated dataset

  • years: the time series selected

In the shiny app there is also the ‘primary’ parameters, a vector containing the code of the primary items. The functions goes gradually from the best way to calculating the extraction rate, to the default value contained in the commodity tree.

Firstly the function checks if there are input elements available. If so, the extraction rate is calculated as a ration between the production and the input figures, flagged (I, i). If no input value is available then only zero missing values can be assigned to the object input2Er used in the following steps.

The code then checks if there is a non-null and non-empty object with past extraction rates. If values are available for the time series then the value is assigned, otherwise the extraction rates are used to perform a carry-forward process, i.e. missing extraction rates are imputed equal to the previous year one and flagged (E, t). These value are stored into the ErFromData object. If no information was retrieved for either of the previous attempts the default extraction rate contained into the commodity tree is assigned.

The computed extraction rates are appended to the rest of the data.

#-- Extraction rates calculation ----

eRcomputation <- function(data,  tree, 
                          oldEr = NULL, years){
  
  newYear <- max(as.numeric(years))
  ## the principal ingredient to compute the foodProcessing is the extraction rates
  ## for those items that already have a protected figures for 31 the default extractio rate 
  ## in the commodity tree has to be updated:
  
  dataProd <- data[measuredElementSuaFbs=="5510"]
  dataInput <- data[measuredElementSuaFbs=="5302"]
  
  # Take all secondary products
  Er0 <- unique(data[ !measuredItemFaostat_L2 %in% primary, .(geographicAreaM49_fi, timePointYears,  measuredItemFaostat_L2, availability)])
  Er0[ , measuredElementSuaFbs := '5423']
  
  if(nrow(dataInput) > 0){
    computeExtractionRate <- merge(dataProd, dataInput, 
                                   by = c("geographicAreaM49_fi", "timePointYears", 
                                          "measuredItemFaostat_L2", "availability"), 
                                   suffixes = c("_prod","_input"))
    computeExtractionRate[ , extraction_rate_C := Value_prod/Value_input]
     
    setnames(computeExtractionRate,c("extraction_rate_C"), "Value")
    computeExtractionRate <- computeExtractionRate[, .(geographicAreaM49_fi, timePointYears, measuredItemFaostat_L2, availability, Value)]
    computeExtractionRate[ , ':=' (flagObservationStatus = "I", flagMethod = "i" ) ] 
    
    input2Er <- merge(Er0, computeExtractionRate, by = c("geographicAreaM49_fi", "timePointYears",
                                                          "measuredItemFaostat_L2", "availability"), all.x = TRUE)
  } else {
    # If no Input then nothing happens
    input2Er <- Er0[ , ':=' (Value = 0, flagObservationStatus = "M", flagMethod = "u" ) ] 
  }
  
  # Historical data
  if(!is.null(oldEr)){
    
    if(nrow(oldEr) > 0){
    
    old2newEr <- merge(input2Er, oldEr, by = c('geographicAreaM49_fi', 'measuredElementSuaFbs',
                                            'measuredItemFaostat_L2', 'timePointYears'), 
                       all.x = TRUE, suffixes = c('', 'Series'))
    
    old2newEr[is.na(Value) | Value == 0, c('Value', 
                                           'flagObservationStatus',
                                           'flagMethod') := list(ValueSeries,
                                                                 flagObservationStatusSeries,
                                                                 flagMethodSeries)]
    
    old2newEr[ , c("ValueSeries", "flagObservationStatusSeries",
                 "flagMethodSeries") := NULL]
    
    # Carry forward are flagged as (E, t)
    carryforward <- data.table()
    for(i in 1:length(years)){
      previous <- oldEr[timePointYears == as.character(as.numeric(years[i])-1)]
      missing <- old2newEr[timePointYears == years[i]][is.na(Value) | Value == 0]
      
      # Only previous available values taken
      carryforward2append <- merge(previous, missing, by = c('geographicAreaM49_fi',
                                                             'measuredElementSuaFbs',
                                                             'measuredItemFaostat_L2'),
                                   suffixes = c('Prev', ''))
      carryforward2append[ , c('Value', 
                               'flagObservationStatus',
                               'flagMethod') := list(ValuePrev,
                                                     flagObservationStatusPrev,
                                                     flagMethodPrev)]
      carryforward2append[ , names(carryforward2append)[grepl('Prev', names(carryforward2append))] := NULL]
      
      carryforward <- rbind(carryforward, carryforward2append)
      }
    
    ErFromData <- merge(old2newEr, carryforward, by = c('geographicAreaM49_fi',
                                                        'measuredElementSuaFbs',
                                                        'measuredItemFaostat_L2',
                                                        'timePointYears',
                                                        'availability'),
                        all.x = TRUE, suffixes = c('', 'CF'))
    
    ErFromData[is.na(Value) | Value == 0, c('Value', 
                                            'flagObservationStatus',
                                            'flagMethod') := list(ValueCF,
                                                                  'E', 't')]
    ErFromData[ , names(ErFromData)[grepl('CF', names(ErFromData))] := NULL]
    
  
    } else {
    ErFromData <- input2Er
  }
    
  } else{
    ErFromData <- input2Er
  }
  
  # Add default Er if still any missing
  DefEr <- merge(ErFromData, tree[!is.na(extraction_rate) , .(child, extraction_rate)], 
                       by.x = c("measuredItemFaostat_L2"), by.y = c("child"), all.x = TRUE)
  # Flag for default extraction rate is (I, c) = Imputed, Copied from elsewhere in the working system
  DefEr[is.na(Value) | Value == 0, c('Value', 
                                         'flagObservationStatus',
                                         'flagMethod') := list(extraction_rate,
                                                               'I', 'c')]
  DefEr[ , extraction_rate := NULL]
  
  eRadded <- DefEr[!is.na(Value)]
  setkey(eRadded)
  eRadded <- unique(eRadded)
  eRadded <- eRadded[, Value := round(Value, 6)]
  dataEr <- rbind(data[measuredElementSuaFbs != '5423'], eRadded)
  setkey(dataEr)
  dataEr <- unique(dataEr)
}

2.2.2 Input

The ‘inputComputation’ takes as unique input the data just returned by the ‘eRcomputation’ function. Once the extraction rates have been calculated, also the input elements can be. The process to compute the input element is opposite to the extraction rate. If Er = Prod/Input , Input = Prod/Er . If there are official input data they are taken aside. The input element is calculated for all products and then merged with the official ones. The official prevail on the calculated one and then the complete input data are appended to the other data.


#-- Input calculation ----

inputComputation <- function(data){ #, treeNewER){
 
  # Isolate needed elements
  # Previous input available
  input <-  data[ !measuredItemFaostat_L2 %in% primary & measuredElementSuaFbs == '5302', ]
  
  #Er and production needed to compute input if no previous data
  Er <- data[ !measuredItemFaostat_L2 %in% primary & measuredElementSuaFbs == '5423', ]
  Prod <- data[ !measuredItemFaostat_L2 %in% primary & measuredElementSuaFbs == '5510', ]
  
  # Calculate Input
  InputCalc <- merge(Prod, Er, by = c("geographicAreaM49_fi",
                                      "timePointYears", "measuredItemFaostat_L2",
                                      "availability"), suffixes = c("_prod", "_Er"))
  InputCalc <- InputCalc[!is.na(Value_Er)]
  if(nrow(InputCalc[is.na(Value_Er)]) > 0 ){
  message('Missing extraction rates for some Ics groups')
  }
  
  InputCalc[ , input := Value_prod / Value_Er]
  
  data_compute31 <- melt(InputCalc,
                       id.vars = c("geographicAreaM49_fi", "timePointYears",  "measuredItemFaostat_L2", "availability"),
                       measure.vars = "input",
                       value.name = "Value" ,
                       variable.name = "measuredElementSuaFbs", variable.factor = FALSE)

  
  data_compute31[measuredElementSuaFbs=="input",measuredElementSuaFbs:="5302"]
  data_compute31[ , ':='(flagObservationStatus = 'I', flagMethod = 'i', FBSsign = 0)]
  
  # See if any official input
  comp31 <- merge(data_compute31, input, by = c("geographicAreaM49_fi", 
                                "timePointYears",  
                                "measuredItemFaostat_L2", 
                                "availability", 
                                "measuredElementSuaFbs"), all = TRUE,
                  suff = c('', 'Official'))
  
  # If previous data is not NA then it is assigned as input 
  # Note: the Er should have been computed as ratio between Production and Input
  comp31[!is.na(ValueOfficial), c('Value','flagObservationStatus','flagMethod'):= list(ValueOfficial, 
                                                                                       flagObservationStatusOfficial,
                                                                                       flagMethodOfficial)]
  
  comp31 <- comp31[ , c('ValueOfficial', 'flagObservationStatusOfficial',
                       'flagMethodOfficial') := NULL]
  
  # Remove all input data from the original data and add the only data31 part
  # existing data are included as computed with computed extraction rates
  # other input data are computed starting from given extraction rates
  dataNo31 <- data[measuredElementSuaFbs!="5302"]
  SUAinput <- rbind(dataNo31, comp31[,.(geographicAreaM49_fi, timePointYears,  measuredItemFaostat_L2, availability, measuredElementSuaFbs, Value, flagObservationStatus, flagMethod)]) #rbind(data, data_compute31) #
  return(SUAinput)
  
}

2.2.3 Food Processing

The food processing function proceed by step, assigning the input to the highest rank product until the availability allows for. It takes as input the data with the calculated input, the new commodity tree with the updated extraction rates and the primary ICS code vector.

Fist of all, it calculates the whole quantity to cover in terms of primary equivalent by summing all the input elements by parent and compare the total with the availability of the correspondent primary product (or primary parent). If the total processing to cover is higher than the primary availability, then the processing for the primary parent is set equal to the availability.


#-- Food processing calculation ----

foodProcessingComputation <- function(SUAinput, treeNewER, primary){
  
  #-- Primary processing ----

  if(nrow(SUAinput[measuredElementSuaFbs %in% c('5510', '5302') &
                   measuredItemFaostat_L2 %in% treeNewER[weight == TRUE]$child]) == 0){
   
    data131full <- data.table(geographicAreaM49_fi = 0,
                              measuredItemFaostat_L2 = 0,
                              timePointYears = 0,
                              availability = 0,
                              Value = 0,
                              measuredElementSuaFbs = 0)
    problems <- list(primary = data.table(),
                     secondaryTot = data.table(),
                     secondary = data.table(),
                     tertiary = data.table(),
                     quaternary = data.table(),
                     NotCovered = data.table())
     
  } else {
  
  
  # Tree only with primary
  treePrimary <- treeNewER[parent %in%  primary]
  treePrimary[, child:=as.character(child)]
  
  data_compute131 <- copy(SUAinput)
  setnames(data_compute131, "measuredItemFaostat_L2", "child")
  data_compute131tree <- merge(data_compute131, treePrimary, by=c("geographicAreaM49_fi", "timePointYears", "child"))
  
  # sum all the inputs
  data_compute131tree <- data_compute131tree[ measuredElementSuaFbs=="5302" , processing:=sum(Value, na.rm = TRUE), by=c("geographicAreaM49_fi",
                                                                                                  "timePointYears",
                                                                                                  "parent")]
  
  # processing is unique for each primary parent so take only these values
  # data_compute131processing = Total input 5302 by parent
  setkey(data_compute131tree)
  data_compute131processing <- unique(data_compute131tree[!is.na(processing),.(geographicAreaM49_fi, parent, timePointYears, processing)])
  setnames(data_compute131processing, "parent", "measuredItemFaostat_L2")
  
  ## The total 'food processing' (input to cover) has been just computed for any Primary Parent, 
  ## we have to compare the 'new component' based on the sum of all the child-input
  ## with the actual primary availability, in order to be sure that the new availabily
  ## (computed including the just computed food processing in the primary SUA line) does not produce
  ## a negative imbalance.
  
  data131 <- merge(SUAinput, data_compute131processing, by=c( "geographicAreaM49_fi","measuredItemFaostat_L2","timePointYears"), all.x = TRUE)
  
  # take only the processing to check availability
  setkey(data131)
  data131 <- unique(data131[!is.na(processing),.(geographicAreaM49_fi, measuredItemFaostat_L2, timePointYears, availability, processing)])
  
  ## SeconLevelProcessing variable is computed to evaluate which primary availabilities are lower than the 
  ## food processing. 
  # secondLevelProcessing < 0 quantity left to cover (Not enough primary for all processing)
  # secondLevelProcessing > 0 primary availability exceeding processing
  data131[, secondLevelProcessing := availability-processing]
  
  # If not enough availability then the processing imputable to primary parent is put equal to the availability 
  data131[secondLevelProcessing < 0, processing := availability]

The object secondLevelProcessing is created containing the data for products the primary availability alone was not enough. The function controls the primary availability covers at least the rank 1 secondary parent. When this condition is not fulfilled the data are stored in the rank1availCheck object and will be reported to the user in the shiny app and in the final email.

Another check stored in the insufficiency object. In this object are stored products where the sum of all secondary availability is not enough to store the uncovered quantity. All the comparison need to be done in primary equivalent otherwise quantities tel-quel cannot be compared.

The products having enough availability are stored in a different object (toDeviateCovered) and processed.

   # Primary measuredItemFaostat_L2 from which not enough availability to cover processing
  secondLevelProcessing <- data131[secondLevelProcessing < 0]
  setnames(secondLevelProcessing, "measuredItemFaostat_L2", "parent")
  problems <- list(primary = data.table(),
                   secondaryTot = data.table(),
                   secondary = data.table(),
                   tertiary = data.table(),
                   quaternary = data.table(),
                   NotCovered = data.table())
  
  #-- Secondary processing  ----
  
  if(nrow(secondLevelProcessing) > 0) {
    
    # merge by parent so that secondary parents can be found
    # toDeviate has dimensions: (parent_primary, geographicAreaM49_fi, timePointYears,
    #   availability, processing, secondLevelProcessing, parent_secondary, weight, extraction_rate, rank)
    
    toDeviate <- merge(secondLevelProcessing, treeNewER[weight != FALSE], by=c("geographicAreaM49_fi", "timePointYears", "parent"), allow.cartesian = TRUE)
    setnames(toDeviate, c("parent","child"), c("parent_primary","parent_secondary"))
    
    # # tree only with possible secondary parents
    # secondary <- treeNewER[!parent %in% primary, unique(parent)]
    # 
    # # Keep a child only if it can also be a (secondary) parent
    # toDeviate <- toDeviate[parent_secondary %in% secondary]
    
    # compare imbalance with rank 1 parent availability
    toDeviate2proc <- merge(toDeviate, SUAinput, by.x = c("geographicAreaM49_fi", "timePointYears",  "parent_secondary"),
                            by.y = c("geographicAreaM49_fi", "timePointYears",  "measuredItemFaostat_L2"), suffixes = c('_primary','_secondary'))
    
    # Make sure primary availability covers at least the rank 1 child
    rank1availCheck <- toDeviate2proc[parent_secondary %in%  unique(treeNewER[rank == 1 ]$parent) &
                                        measuredElementSuaFbs == '5302' & 
                                        Value > availability_primary ]
    
    if(nrow(rank1availCheck) > 0){
      msg1 <- paste('Not enough primary availability for groups: ', paste(unique(rank1availCheck$parent_primary), collapse = ","),
                    'for years: ', paste(unique(rank1availCheck$timePointYears), collapse = ","), 
                    'and country', paste(unique(rank1availCheck$geographicAreaM49_fi), collapse = ","))
      message(msg1)
      rank1availCheck
      
    }               
    
    msg1 <- ifelse(nrow(rank1availCheck) > 0, msg1, 'Primary products cover at least first rank children (see commodity tree).')
    toDeviate2proc2 <- unique(toDeviate2proc[, .( geographicAreaM49_fi, timePointYears, parent_secondary, parent_primary, 
                                                  availability_primary, processing, secondLevelProcessing, weight, 
                                                  extraction_rate, availability_secondary)])
    # problems$primary <- list(msg = msg1, tab = rank1availCheck[ , .(geographicAreaM49_fi,
    #                                                                 timePointYears,
    #                                                                 parent_secondary,
    #                                                                 parent_primary,
    #                                                                 availability_primary,
    #                                                                 Value)])
    problems$primary <- rank1availCheck[ , .(geographicAreaM49_fi,
                                             timePointYears,
                                             parent_secondary,
                                             parent_primary,
                                             availability_primary,
                                             Value)]
    
    # If availability for secondary products was negative then production has been artificially increased so
    # secondary availability is 0 and not negative
    #toDeviate2proc2[availability_secondary < 0, availability_secondary := 0]
    # Check if enough secondary availability to cover unbalance
    toDeviate2proc2[extraction_rate != 0 & !is.na(extraction_rate), availability_secondary_primEq := availability_secondary/extraction_rate ]
    toDeviate2proc2[parent_secondary %in% unique(treeNewER$parent), availability_secondary_primEq_tot := sum(availability_secondary_primEq, na.rm = TRUE), by = list(geographicAreaM49_fi,
                                                                                                                                                                     timePointYears,
                                                                                                                                                                     parent_primary) ]
    checkingTab <- unique(toDeviate2proc2[ , .(geographicAreaM49_fi, timePointYears, parent_primary, availability_primary, 
                                               secondLevelProcessing, availability_secondary_primEq_tot)])
    
    add2NotCovered <- toDeviate2proc2[availability_secondary_primEq_tot < (-1)*secondLevelProcessing ]
    
    add2NotCovered <- add2NotCovered[ , .(geographicAreaM49_fi, parent_primary, timePointYears,
                                          secondLevelProcessing,
                                          extraction_rate)]
    
    add2NotCovered <- add2NotCovered[ , secondLevelProcessing := -secondLevelProcessing]
    add2NotCovered <- add2NotCovered[ , foodProcessingSecondary := 0]
    setkey(add2NotCovered)
    add2NotCovered <- unique(add2NotCovered)
    # -- General insufficiency ----
    insufficiency <- checkingTab[availability_secondary_primEq_tot < (-1)*secondLevelProcessing ]
    
    if(nrow(insufficiency) > 0){
      
      problem <- 'PROBLEM! Not enough secondary availability to cover production!'
      msg2 <- list(problem,insufficiency)
      
    }
    msg2 <- ifelse(nrow(insufficiency) > 0, msg2, 'Total secondary availability can cover remaining secondary production. Level by level availability to be checked.')
    
    # problems$secondaryTot <- list(msg = msg2, tab = insufficiency[ , .(geographicAreaM49_fi,
    #                                                                    timePointYears,
    #                                                                    parent_primary,
    #                                                                    AvailableQuantity = (availability_secondary_primEq_tot),
    #                                                                    Quantity2cover = (secondLevelProcessing)*(-1))])
    
    problems$secondaryTot <- insufficiency[ , .(geographicAreaM49_fi,
                                                timePointYears,
                                                parent_primary,
                                                AvailableQuantity = (availability_secondary_primEq_tot),
                                                Quantity2cover = (secondLevelProcessing)*(-1))]
    
    ##############################################################  
    #-- NEW PART ----  
    # Put unrealistic values to primary products for which there is not enough availability to cover production
    # data131covered <- merge(data131, insufficiency[ , .(geographicAreaM49_fi, timePointYears, parent_primary, availability_primary)],
    #                         by.x = c('geographicAreaM49_fi', 'measuredItemFaostat_L2', 'timePointYears'),
    #                         by.y = c('geographicAreaM49_fi', 'parent_primary', 'timePointYears'),
    #                         all = TRUE)
    # data131covered[!is.na(availability_primary), processing := -9999]
    
    # Exclude the products that cannot be covered
    toDeviateCovered <- merge(toDeviate2proc2[, .(geographicAreaM49_fi,
                                                  timePointYears,
                                                  parent_primary,
                                                  parent_secondary, 
                                                  secondLevelProcessing,
                                                  extraction_rate,
                                                  availability_secondary, 
                                                  availability_secondary_primEq)], 
                              insufficiency[ , .(geographicAreaM49_fi, timePointYears, parent_primary, availability_primary)], 
                              by = c('geographicAreaM49_fi', 'timePointYears', 'parent_primary'), all = TRUE)
    
    toDeviateCovered <- toDeviateCovered[is.na(availability_primary)] ### !!!!
    toDeviateCovered[ , availability_primary := NULL]
    setkey(toDeviateCovered)
    toDeviateCovered <- unique(toDeviateCovered)
    #######################################################################

The data to process will go through the four level of possible parent (the rank is a parameter included in the commodity tree). The secondaryFP object is created and the function goes ranks by rank, allocating the maximum input elements to the highest rank parent until the end of the input quantity.

At each level, the function checks if there is at least enough availability for one parent to cover the following products in rank, if not this is stored in the problems list. Each time from primary equivalent the food processing element is transformed into specific product weight with the extraction rates.


   # Consider all ICS product that can be parent
    secondaryFP <-  merge(unique(treeNewER[, .(parent, rank)]), toDeviateCovered,
                          by.x = c("parent"), 
                          by.y = c("parent_secondary"))
    
    # transform in secondary equivalent the excess of processing not covered by the primary
    # secondaryFP[, secondLevelProcessing:= (secondLevelProcessing * extraction_rate)*(-1)]
    secondaryFP[, secondLevelProcessing:= (secondLevelProcessing)*(-1)]
    setkey(secondaryFP)
    secondaryFP <- unique(secondaryFP)
    
    # Maximum level of processing is 4 so creating space for it
    secondaryFP[ , ':=' (rank1Processing = 0, thirdLevelProcessing = 0, 
                         rank2Processing = 0, fourthLevelProcessing = 0, 
                         rank3Processing = 0, fifthLevelProcessing = 0, rank4Processing = 0)]
    
    
    #-- Rank 1 secondary parent ----
    
    # Calculation of first secondary parent food processing already in secondary equivalent
    # rank1Processing = quantity of input covered by rank 1 parent
    secondaryFP[ rank == 1, rank1Processing := ifelse(availability_secondary_primEq  > secondLevelProcessing,
                                                      secondLevelProcessing*extraction_rate , availability_secondary) ]
    
    # Where there is need of second secondary parent
    rank2needed <- unique(secondaryFP[rank == 1 & availability_secondary_primEq  < secondLevelProcessing, .(geographicAreaM49_fi, parent, timePointYears, parent_primary, secondLevelProcessing, availability_secondary_primEq) ])
    
    #-- Rank 2 secondary parent ----
    
    if(nrow(rank2needed) > 0){
      rank2needed[ , thirdLevelProcessing := secondLevelProcessing - (availability_secondary_primEq )]
      
      # Merge to put the right third level processing
      secondaryFPneeded <- merge(secondaryFP[ rank == 2], rank2needed, 
                                 by = c("geographicAreaM49_fi", "timePointYears", "parent_primary"), all = TRUE, suffixes = c('_available', '_needed'))
      
      secondaryFPneeded[ is.na(thirdLevelProcessing_needed), thirdLevelProcessing_needed := 0 ]
      
      unavailable <- secondaryFPneeded[is.na(parent_available)]
      
      if(nrow(unavailable) > 0){
        msg3 <- paste('Not enough product to cover processing for primary group(s)', paste(unique(unavailable$parent_primary), collapse = ", "),
                      "at level 2, for country", paste(unique(unavailable$geographicAreaM49_fi), collapse = ", ") )
        message(msg3)
      }
      msg3 <- ifelse(nrow(unavailable) > 0, msg3, 'Enough rank 2 secondary parent to cover rank 3 children (see commodity tree).')
      
      # problems$secondary <- list(msg = msg3, tab = unavailable[ , .(geographicAreaM49_fi,
      #                                                               timePointYears,
      #                                                               parent_primary,
      #                                                               parent_needed,
      #                                                               Quantity2cover = (thirdLevelProcessing_needed))] )
      # 
      problems$secondary <- unavailable[ , .(geographicAreaM49_fi,
                                             timePointYears,
                                             parent_primary,
                                             parent_needed,
                                             Quantity2cover = (thirdLevelProcessing_needed))]
      
      secondaryFP2 <- copy(secondaryFPneeded)
      secondaryFP2[is.na(parent_available), availability_secondary_primEq_available := 0]
      
      
      secondaryFP2 <- secondaryFP2[ ,rank2Processing := ifelse(availability_secondary_primEq_available  > thirdLevelProcessing_needed, 
                                                               thirdLevelProcessing_needed *extraction_rate, 
                                                               availability_secondary_primEq_available *extraction_rate)]
      
      secondaryFP2 <- secondaryFP2[  , .(geographicAreaM49_fi, timePointYears, parent_primary, parent_available, rank,
                                         secondLevelProcessing_available, extraction_rate, availability_secondary,
                                         availability_secondary_primEq_available, rank1Processing,
                                         rank2Processing, fourthLevelProcessing, rank3Processing, fifthLevelProcessing, rank4Processing,
                                         thirdLevelProcessing_needed)]
      setnames(secondaryFP2, names(secondaryFP2), gsub("_available", "", names(secondaryFP2)))
      setnames(secondaryFP2, 'thirdLevelProcessing_needed', 'thirdLevelProcessing')
      secondaryFP2 <- rbind(secondaryFP2, secondaryFP[rank != 2, ])   
      secondaryFP <- secondaryFP2
      # Where there is need of third secondary parent
      
      rank3needed <- unique(secondaryFP2[rank == 2 & availability_secondary_primEq  < thirdLevelProcessing, .(geographicAreaM49_fi, parent, timePointYears, parent_primary, thirdLevelProcessing, availability_secondary_primEq) ])
      
      #-- Rank 3 secondary parent ----
      
      if(nrow(rank3needed) > 0){
        rank3needed[ , fourthLevelProcessing := thirdLevelProcessing - (availability_secondary_primEq )]
        
        tertiaryFPneeded <- merge(secondaryFP2[ rank == 3], rank3needed, 
                                  by = c("geographicAreaM49_fi", "timePointYears", "parent_primary"), all = TRUE, suffixes = c('_available', '_needed'))
        
        tertiaryFPneeded[is.na(fourthLevelProcessing_needed), fourthLevelProcessing_needed := 0 ]
        
        unavailable3 <- tertiaryFPneeded[is.na(parent_available)]
        
        if(nrow(unavailable3) > 0){
          msg4 <- paste('Not enough product to cover processing for primary group(s)', paste(unique(unavailable3$parent_primary), collapse = ", "),
                        "at level 3, for country ", paste(unique(unavailable3$geographicAreaM49_fi ), collapse = ", "))
          message(msg4)
        }
        msg4 <- ifelse(nrow(unavailable3) > 0, msg4, 'Enough rank 3 secondary parent to cover rank 4 children (see commodity tree).')
        
        # problems$tertiary <- list(msg = msg4, tab = unavailable3[ , .(geographicAreaM49_fi,
        #                                                               timePointYears,
        #                                                               parent_primary,
        #                                                               parent_needed,
        #                                                               Quantity2cover = (fourthLevelProcessing_needed))] )
        # 
        problems$tertiary <- unavailable3[ , .(geographicAreaM49_fi,
                                               timePointYears,
                                               parent_primary,
                                               parent_needed,
                                               Quantity2cover = (fourthLevelProcessing_needed))]

        
        secondaryFP3 <- copy(tertiaryFPneeded)
        
        secondaryFP3[is.na(parent_available), availability_secondary_primEq_available := 0]
        
        secondaryFP3 <- secondaryFP3[ ,rank3Processing := ifelse(availability_secondary_primEq_available  > fourthLevelProcessing_needed, 
                                                                 fourthLevelProcessing_needed *extraction_rate, 
                                                                 availability_secondary_primEq_available *extraction_rate)]
        
        secondaryFP3 <- secondaryFP3[  , .(geographicAreaM49_fi, timePointYears, parent_primary, parent_available, rank,
                                           secondLevelProcessing, extraction_rate, availability_secondary,
                                           availability_secondary_primEq_available, rank1Processing, thirdLevelProcessing_available,
                                           rank2Processing, rank3Processing, fifthLevelProcessing, rank4Processing,
                                           fourthLevelProcessing_needed)]
        setnames(secondaryFP3, names(secondaryFP3), gsub("_available", "", names(secondaryFP3)))
        setnames(secondaryFP3, 'fourthLevelProcessing_needed', 'fourthLevelProcessing')
        secondaryFP3 <- rbind(secondaryFP3, secondaryFP2[rank != 3, ])   
        secondaryFP <- secondaryFP3
        
        # Where there is need of fourth secondary parent
        
        rank4needed <- unique(secondaryFP3[rank == 3 & availability_secondary_primEq  < fourthLevelProcessing, .(geographicAreaM49_fi, parent, timePointYears, parent_primary, fourthLevelProcessing, availability_secondary_primEq) ])
        
        #-- Rank 4 secondary parent ----
        if(nrow(rank4needed) > 0 ){
          rank4needed[ , fifthLevelProcessing := fourthLevelProcessing - (availability_secondary_primEq )]
          
          quaternaryFPneeded <- merge(secondaryFP3[ rank == 4], rank4needed, 
                                      by = c("geographicAreaM49_fi", "timePointYears", "parent_primary"), all = TRUE, suffixes = c('_available', '_needed'))
          
          quaternaryFPneeded[ is.na(fifthLevelProcessing_needed), fifthLevelProcessing_needed := 0 ]
          
          unavailable4 <- quaternaryFPneeded[is.na(parent_available)]
          
          if(nrow(unavailable4) > 0){
            msg5 <- paste('Not enough product to cover processing for primary group(s)', paste(unique(unavailable4$parent_primary), collapse = ", "),
                          "at level 4, for country ", paste(unique(unavailable4$geographicAreaM49_fi), collapse = ", "))
            message(msg5)
          }
          msg5 <- ifelse(nrow(unavailable4) > 0, msg5, 'Enough rank 4 secondary parent to cover remaining children (see commodity tree).')
          
          # problems$quaternary <- list(msg = msg5, tab =unavailable4[ , .(geographicAreaM49_fi,
          #                                                                timePointYears,
          #                                                                parent_primary,
          #                                                                parent_needed,
          #                                                                Quantity2cover = (fifthLevelProcessing_needed))])
          
          problems$quaternary <- unavailable4[ , .(geographicAreaM49_fi,
                                                   timePointYears,
                                                   parent_primary,
                                                   parent_needed,
                                                   Quantity2cover = (fifthLevelProcessing_needed))]
          
          
          secondaryFP4 <- copy(quaternaryFPneeded)
          secondaryFP4[is.na(parent_available), availability_secondary_primEq_available := 0]
          
          secondaryFP4 <- secondaryFP4[ ,rank4Processing := ifelse(availability_secondary_primEq_available  > fifthLevelProcessing_needed, 
                                                                   fifthLevelProcessing_needed *extraction_rate, 
                                                                   availability_secondary_primEq_available *extraction_rate)]
          
          secondaryFP4 <- secondaryFP4[  , .(geographicAreaM49_fi, timePointYears, parent_primary, parent_available, rank,
                                             secondLevelProcessing, extraction_rate, availability_secondary,
                                             availability_secondary_primEq_available, rank1Processing, thirdLevelProcessing,
                                             rank2Processing, fourthLevelProcessing_available, rank3Processing, 
                                             fifthLevelProcessing_needed, rank4Processing)]
          setnames(secondaryFP4, names(secondaryFP4), gsub("_available", "", names(secondaryFP4)))
          setnames(secondaryFP4, 'fifthLevelProcessing_needed', 'fifthLevelProcessing')
          secondaryFP4 <- rbind(secondaryFP4, secondaryFP3[rank != 4, ])   
          secondaryFP <- secondaryFP4
          
          
          rank5needed <- unique(secondaryFP4[rank == 4 & availability_secondary_primEq  < fifthLevelProcessing,  ])
          
        }
        
      }
      
    }
    

At the end of the process all the secondaryFP object is processed so to be appended to the other data and the uncovered data are stored in the problems list in the NotCovered table.


    #-- Organising data ----
    
    processingLevelsComputed <-  melt(secondaryFP, 
                                      id.vars = c("parent",
                                                  "rank", "geographicAreaM49_fi",
                                                  "timePointYears", "parent_primary",
                                                  "secondLevelProcessing", "extraction_rate",
                                                  "availability_secondary", "availability_secondary_primEq"),
                                      measure.vars = c("rank1Processing", "rank2Processing", "rank3Processing", "rank4Processing"),
                                      value.name = "foodProcessingSecondary",
                                      variable.name = "levelOfProcessing")
    
    NotCovered <- processingLevelsComputed[ , .(geographicAreaM49_fi, parent_primary, timePointYears, secondLevelProcessing, extraction_rate, foodProcessingSecondary)]
    setkey(NotCovered)
    NotCovered <- unique(NotCovered)
    NotCovered <- rbind(NotCovered, add2NotCovered)

    
    if(nrow(NotCovered) > 0){
      NotCovered[ , foodProcessingSecondary_primEq := foodProcessingSecondary/extraction_rate ]
      NotCovered[ , c('secondLevelProcessing',  'foodProcessingSecondary_primEq') := list(secondLevelProcessing,
                                                                                          sum(foodProcessingSecondary_primEq)),
                  by = c('geographicAreaM49_fi', 'parent_primary', 'timePointYears')]
      NotCovered[ , UncoveredQuantity := secondLevelProcessing - foodProcessingSecondary_primEq]
      NotCovered <- NotCovered[secondLevelProcessing > foodProcessingSecondary_primEq]
    }
    
    if(nrow(NotCovered) > 0){
      NotCovered <- NotCovered[ , c('secondLevelProcessing',  'foodProcessingSecondary', 'extraction_rate', 'foodProcessingSecondary_primEq') := NULL]
      setkey(NotCovered)
      NotCovered <- unique(NotCovered)
      problems$NotCovered <- NotCovered
    } else {
      problems$NotCovered <- data.table()
    }
    
    processingLevelsAssigned <- processingLevelsComputed[foodProcessingSecondary != 0]
    
    processingLevelsAssigned <- processingLevelsAssigned[ , .(geographicAreaM49_fi, parent, timePointYears, 
                                                              availability_secondary, foodProcessingSecondary)]
    
    setnames(processingLevelsAssigned, c("parent", "availability_secondary", "foodProcessingSecondary"), 
             c("measuredItemFaostat_L2", "availability", "processing"))
    
    data131full <- rbind(data131[ , secondLevelProcessing := NULL], processingLevelsAssigned)
  } else {
    
    data131full <- data131[ , secondLevelProcessing := NULL]
    
  } 
  
  
  data131full[, measuredElementSuaFbs:="5023"]
  setnames(data131full, "processing", "Value")
  
}
  
  return(list(result = data131full,
              problems = problems
  )
  )
  
}